Class: Tins::Limited

Inherits:
Object show all
Defined in:
lib/tins/limited.rb

Overview

Tins::Limited provides a thread pool implementation that limits the number of concurrent threads running simultaneously.

This class implements a producer-consumer pattern where you can submit tasks that will be executed by a fixed number of worker threads, preventing resource exhaustion from too many concurrent operations.

Examples:

Basic usage

limited = Tins::Limited.new(3)  # Limit to 3 concurrent threads

limited.process do |l|
  10.times do
    l.execute { puts "Task #{Thread.current.object_id}" }
  end
  l.stop  # Stop processing new tasks
end

With named thread group

limited = Tins::Limited.new(5, name: 'worker_pool')
# Threads will be named 'worker_pool'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(maximum, name: nil) ⇒ Limited

Create a Limited instance that runs at most maximum threads simultaneously.

Parameters:

  • maximum (Integer)

    The maximum number of concurrent worker threads

  • name (String, nil) (defaults to: nil)

    Optional name for the thread group

Raises:

  • (ArgumentError)

    if maximum is less than 1

  • (TypeError)

    if maximum cannot be converted to Integer



32
33
34
35
36
37
38
39
40
# File 'lib/tins/limited.rb', line 32

def initialize(maximum, name: nil)
  @maximum  = Integer(maximum)
  raise ArgumentError, "maximum < 1" if @maximum < 1
  @mutex    = Mutex.new
  @continue = ConditionVariable.new
  @name     = name
  @count    = 0
  @tg       = ThreadGroup.new
end

Instance Attribute Details

#maximumInteger (readonly)

The maximum number of worker threads that can run concurrently.

Returns:

  • (Integer)

    The maximum concurrent thread limit



45
46
47
# File 'lib/tins/limited.rb', line 45

def maximum
  @maximum
end

Instance Method Details

#create_executorThread (private)

Create and start the executor thread that manages the worker pool.

Returns:

  • (Thread)

    The executor thread



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/tins/limited.rb', line 104

def create_executor
  Thread.new do
    @mutex.synchronize do
      loop do
        if @count < @maximum
          task = @tasks.pop
          @count += 1
          Thread.new do
            @tg.add Thread.current
            task.(Thread.current)
          ensure
            @count -= 1
            @continue.signal
          end
        else
          @continue.wait(@mutex)
        end
      end
    end
  end
end

#done?Boolean (private)

Check if all tasks and threads have completed.

Returns:

  • (Boolean)

    true if no tasks remain and no threads are running



90
91
92
# File 'lib/tins/limited.rb', line 90

def done?
  @tasks.empty? && @tg.list.empty?
end

#execute {|Thread| ... } ⇒ Object

Submit a task to be executed by the thread pool.

Yields:

  • (Thread)

    The block to execute as a task

Raises:

  • (ArgumentError)

    if called before process has been started



51
52
53
54
# File 'lib/tins/limited.rb', line 51

def execute(&block)
  @tasks or raise ArgumentError, "start processing first"
  @tasks << block
end

#process {|Limited| ... } ⇒ void

This method returns an undefined value.

Start processing tasks with the configured thread pool.

This method blocks until all tasks are completed and the processing is stopped. The provided block is called repeatedly to submit tasks via execute().

Yields:

  • (Limited)

    The limited instance for submitting tasks



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/tins/limited.rb', line 64

def process
  @tasks    = Queue.new
  @executor = create_executor
  @executor.name = @name if @name
  catch :stop do
    loop do
      yield self
    end
  ensure
    wait until done?
    @executor.kill
  end
end

#stopvoid

This method returns an undefined value.

Stop processing new tasks and wait for existing tasks to complete.



81
82
83
# File 'lib/tins/limited.rb', line 81

def stop
  throw :stop
end

#waitvoid (private)

This method returns an undefined value.

Wait for all threads in the thread group to complete.



97
98
99
# File 'lib/tins/limited.rb', line 97

def wait
  @tg.list.each(&:join)
end