Scheduling

  • 3572 words
  • Estimated Reading Time: 17 minutes

Feeling like a Zombie#

Working on hobby projects sometimes give me the feeling that I’m a zombie in a low budget horror flick. Slowly shuffling aimlessly, hoping to score some sweet brains. Not accomplishing anything.

For me the primary challenge on trying to build anything always comes down to time. On a good week I can spend 90 minutes a day, Monday through Friday, on hobbies. Saturday and Sunday I’m able to spend up to 4 hours apiece.

However, that’s best case scenario. In practice it’s often closer to 45 minutes a day that I get to spend my creativity on something I’m interested in. On a bad day, it’s zero. There’s been a lot of bad days.

This post details something that I originally intended to spend a week on. But, life happened. Time became scare and next thing I know 6 months went by.

Zombie shuffle.

The Challenge#

My current hobby project is building a prototyping environment for exploring intelligent agents. The current milestone I’ve been chipping away at is deciding on the general pattern for how to organize the update logic for a simulation.

Simulations run in a loop. Each iteration of the loop updates the simulation by a certain amount. This amount is referred to as a tick.

Video games are a type of simulation. They try to simulate at a fixed rate per second or frame rate. A common frame rate is 60 frames per second. For my purposes, one frame is a simulation tick.

A simple simulation loop (or game loop) works like this:

def loop():
  while SimulationState.NOT_PAUSED:
    handle_user_io(scene)
    update_physics(scene)
    update_agents(scene)
    ...
    update_graphics(scene)

The main loop in a game is an architecturally significant aspect of any game. If not handled appropriately, a bad loop can result in dropped frames, simulation speed ups or slow downs, and a frustrated user.

Given it’s importance there’s been a lot of research on the topic. Bob Nystrom has a good write up of the typical concerns when designing a game loop in his Game Programming Patterns book.

A deeper dive is available in the highly recommended book Game Engine Architecture by Jason Gregory.

I also found these resources helpful:

1st Attempt: Event Scheduling#

As I was researching different ways to construct the main simulation loop, I stumbled on a paper by David “Rez” Graham called Efficient, Event-Based Simulations

Here is his original tweet.

In his paper Rez demonstrates how to leverage a priority scheduling algorithm as an alternative to the classic main loop pattern.

I like the idea of scheduling things to happen in the future and letting a system sort it out rather than calling an update method on every simulation entity every frame. At its core, the idea seems elegant. So I decided to kick the tires on building a scheduling based simulation loop.

Rez’s paper is great for introducing the idea of using a scheduler in a game or simulation but it doesn’t go into the details of how to incorporate one into a larger piece of software. To think through that, I started looking at how operating systems leverage schedulers.

For an introduction to the topic, I found the Wikipedia article on operating systems scheduling to be helpful.

Once I had a basic understanding of the basics of scheduling algorithms, Marilyn Wolf’s lecture on real-time scheduling models helped me get the insight into actually building one.

For my first attempt at building a scheduler I made a few decisions.

  1. A job is defined as a Python callable (i.e. function or method) or generator.
  2. Scheduling should be prioritized on running a job at a specified time in the future.
  3. Time is specified in milliseconds.

Here is my first attempt at a job scheduler.

import itertools
TimeInMS = int
ScheduledJobId = int

class JobScheduler(Generic[PriorityItem]):
  def __init__(self) -> None:
    self._jobs_queue: PriorityQueue = PriorityQueue()
    self._job_counter = itertools.count()

  # Runs all jobs that are scheduled for the current time window.
  def run_due_jobs(self, duration: TimeInMS):
    current_time_ms: TimeInMS = self._current_time()
    scheduled_time = current_time_ms + duration

    while len(self._jobs_queue) > 0 and self._jobs_queue.jobs_due(scheduled_time):
      run_time: float
      job: PriorityItem
      job_data: Optional[dict]
      run_time, job, job_data = self._jobs_queue.pop()
      CallableUtility.invoke(job, job_data)

  # Schedules a job to run in the future.
  def schedule(self,
    job: Callable,
    scheduled_time: TimeInMS,
    job_data: Optional[dict] = None) -> ScheduledJobId:
    job_id = self._generate_job_id()
    self._jobs_queue.push(job, job_id, scheduled_time, job_data)
    return job_id

  # Removes a job from the scheduler without executing it.
  def cancel(self, job_id: ScheduledJobId):
    if job_id in self._jobs_queue:
      self._jobs_queue.remove(job_id)

  # Updates the time to run a specific job.
  def reschedule(self, job_id: ScheduledJobId, new_scheduled_time: TimeInMS):
    if job_id in self:
      queued_item_bundle:Optional[PriorityItemDecorator] = self._jobs_queue.index(job_id)
      if queued_item_bundle is not None:
        self._jobs_queue.push(queued_item_bundle.item, job_id, new_scheduled_time)

  # Returns the time of the job is schedule.
  def scheduled(self, job_id: ScheduledJobId) -> TimeInMS:
    if job_id in self:
      bundled_item: Optional[PriorityItemDecorator] = self._jobs_queue.index(job_id)
      return bundled_item.priority if bundled_item else -1
    else:
      raise KeyError(f'Job {job_id} not scheduled.')

  def _current_time(self) -> TimeInMS:
    return TimeUtilities.now()

  def _generate_job_id(self) -> ScheduledJobId:
    """Generates a unique ID for identifying a scheduled job."""
    return next(self._job_counter)

  # Enable assertions of the form: assert my_job_id in job_scheduler
  def __contains__(self, job_id: ScheduledJobId) -> bool:
    return job_id in self._jobs_queue

The main thing to understand about the JobScheduler is that it uses a priority queue to organize pending jobs based on when they’re suppose to run. Building a priority queue is beyond the scope of this article. See the full source code for how I do that if you’re curious.

Here is an example of how the JobScheduler is used.

MS_PER_SEC = 1000
js = JobScheduler()

# Schedule jobs to be run in the future in 1 second, 5 seconds,
# and in 10 seconds
js.schedule(job1, 1 * MS_PER_SEC)
js.schedule(job2, 5 * MS_PER_SEC)
js.schedule(job3, 10* MS_PER_SEC)

# Run everything scheduled for the next 1500 milliseconds and older
js.run_due_jobs(MS_PER_SEC * 1.5)

If we’re not careful the scheduler will monopolize the CPU by constantly checking for jobs to run. To avoid this, I use the strategy of having each simulation tick search for jobs that can run in the given time slice. If there are any, run them. If there is time remaining until the next simulation tick begins then sleep the thread to give the main process room to do other things.

while self.simulation_state is not SimulationState.ENDED:
  if self.simulation_state is SimulationState.RUNNING:
    time_to_render:TimeInMS = loop_stats['start_of_cycle'] + UPDATE_BUDGET
    search_window: TimeInMS = time_to_render - loop_stats['start_of_cycle']
    self._scheduler.run_due_jobs(search_window)

    # Is there any time until we need to render?
    # If so, then sleep until then.
    break_time: TimeInSecs = (time_to_render - TimeUtilities.now())/MS_PER_SEC
    if break_time > 0:
      sleep(break_time)

    self._render()
  else:
    # The simulation isn't running so don't keep checking it.
    sleep(TIME_UNTIL_NEXT_CHECK)

Transitioning to Coroutines#

The time based job scheduler works. I tried building a few small simulations with it and it seems to do the job. I’m not happy with it though. I find that trying to structure simulation logic to happen in the future for everything isn’t intuitive for me. So I kept researching other techniques for organizing simulation logic.

While building the job scheduler I kept thinking about how SimPy enables discrete-event style simulation by leveraging Python’s generators and coroutines.

At the same time I was reading Game Engine Architecture by Jason Gregory and found the reference to Christian Gylrling’s 2015 GDC talk Parallelizing the Naughty Dog Engine Using Fibers.

Gylrling’s talk is excellent and I recommend watching it for insight into how folks where exploring maximizing multiple cores back in 2015 by distributing coroutines across the hardware with a task scheduler.

A few other talks that helped me gain more perspective are:

As excellent as all of these talks are, they’re not directly applicable to my situation. For starters I’m building a prototyping environment not a triple-A game engine. Secondly, I’m working in Python not C++. Lastly, I’m working all by my lonesome on this. I don’t have a team of developers and testers to build a robust engine.

That said, I thought there might be something to using coroutines for organizing game logic. First though, what’s a coroutine?

Thinking Like a Snake: Python Concepts#

If you’re not familiar, Python is a dynamically typed language. All variables have type but they’re not identified by the python interpreter until runtime.

PEP 484 introduced the ability to define type hints in code. The interpreter doesn’t leverage this but it’s helpful for tools like linting and static analysis and in my opinion makes the code more readable. Here is a simple example.

# Plain old Python.
x = 5
y = 22.147
q = 'Man, I wish I knew what the types of my variables are.'

# Python with type hints.
x: int = 5
y: float = 22.147
q: str = 'Type hints are helpful.'

Items that can be invoked can be identified as Callable. In this example you can see how the type of a function passed as a parameter to another function can be declared.

from typing import Callable
def run_another_function(another_function: Callable) -> None:
  another_function()

Functions, lambdas, classes, and object instances (in some cases) are all callables. We can verify this with the callable function.

# Functions are callable.
def my_function():
  print('hi')

assert callable(my_function)

# Lambda's are callable.
my_lambda = lambda: 'hi'
assert callable(my_lambda)

# Classes are callable.
class MyClass:
  pass

assert callable(MyClass)

# Classes that have __call__ defined can be used to
# create callable instances.
class CallableObject:
  def __call__(self) -> None:
    pass

my_instance = CallableObject()
assert callable(my_instance)

Generators and Coroutines are also callables but have more nuance in their definitions.

# A generator is a function that can suspend it's execution
# using the yield statement.
def count_up(starting_value):
  while True:
    yield starting_value
    starting_value += 1

# Initialize the generator.
cu = count_up(5)

# Run a tick of the generator.
print(next(cu)) # Prints 5
print(next(cu)) # Prints 6
print(next(cu)) # Prints 7

# The generator is callable.
assert callable(count_up)

A generator uses the yield statement to suspend the generators execution and can optionally return a value. Coroutines can do this but they can also be sent data when they’re resumed.

# Define a simple coroutine that prints dynamic messages.
def message_printer(intro):
  print('The coroutine has started.')
  try :
    while True:
      msg = (yield)
      print(f'{intro} {msg}')
  except GeneratorExit:
    print("The coroutine is shutting down.")

# Initialize the coroutine.
mp = message_printer('Hi')

# Advance the coroutine to the yield statement.
next(mp) # Prints to STDOUT "The coroutine has started."

# Resume the coroutine at the yield statement and pass in a message.
mp.send('Sam') # Prints to STDOUT "Hi Sam".
mp.send('Linda') # Prints to STDOUT "Hi Linda".

# Terminate the coroutine with the close command.
mp.close() # Prints to STDOUT "The coroutine is shutting down."

# Once the coroutine is closed, attempting to send it more data will
# throw a StopIteration error.

Generators and coroutines are powerful tools. To get an exhaustive walk through of what you can do with them see David M. Beazley’s talks on generators and coroutines on the subject.

Dreams of Parallelism#

The GDC presentations about Naughty Dog’s and Destiny’s engines got me dreaming about having coroutines that could be distributed across CPU cores and run in parallel.

Now to be clear, I don’t really need to do this. Remember I’m trying to build an environment to enable prototyping. But it’s a fun idea, and ultimately I’m working on this as a hobby so I spent some cycles researching the viability of it.

Yeah… It’s not viable with the current state of vanilla python. The problem can be summarized as:

  1. Python runs in a single operating system process.
  2. When a Python application spawns a thread it’s bound in that process. That means Python threading enables concurrency (i.e. context switching) in a single process.
  3. To achieve parallelism, a program must spawn multiple python processes and communicate between them by sending serialized (i.e. strings) messages.
  4. You can spin up a coroutine in a process but to suspend it and resume it in another process would require you serialize it, store it, and then deserialize it on the process that has the bandwidth to run it.
  5. You can’t serialize a generator or coroutine.

Alexandre Vassalotti explains some of the nuance as to why we can’t serialize generators and coroutines in his 2009 blog post.

So out of the box this isn’t possible. Out of curiosity, I spent a little (and I mean little) time researching enabling this as a C extension and it turns out Dale Weiler has had success doing this.

Ultimately, I decided I’m happy living in single threaded land. ;)

2nd Attempt: Task Scheduling#

For my second attempt at organizing the simulation logic I leveraged coroutines and a tiered priority queue but I also introduced several new concepts.

All the Things#

Conceptually, the “things” in a simulation are organized into three categories:

  • Agents: Something that is simulating a living, thinking being that has some degree of agency.
  • Paths: A curve, line or circle that an constrains an agent’s movement.
  • Entity: Everything else. Both static and interactive items that may have associated logic but does not have agency (e.g. rocks, walls, doors). Entities can be grouped into groupings (e.g. all the rocks) to enable batch processing of a particular entity type.
  • Task: A piece of logic that can be associated to one of more agents or entities.
  • Renderer: A piece of logic responsible for drawing an agent, entity, or path.

I’m sure this classification will expand.

Scene Files#

The simulations that leverage the Job Scheduler are implemented 100% in Python. These became unwieldy rather quickly. To help reduce this, I transitioned to a declarative approach for defining simulation “scenes”.

Now scenes are declared with a combination of TOML and Python files. A scene can have the organization of the simulation (agents, paths, entities) declared in a scene file and associated logic can be implemented in Python as registered functions (tasks, renderers).

The snippet below is simple scene that declares a pulsing circle in it.

# Define aspects about the simulations UI.
[simulation.ui]
title = 'Pulsing Circle Simulation'
description = 'A single circle that oscillates in size.'
instructions = 'Click the start button to begin the simulation.'

# Register the Python functions that will be associated
# to Agents or Entities.
# The functions are implemented separately.
[registered_functions]
renderers = ['simple_circle_renderer']
tasks = ['pulse_circle_coroutine']

# Define scene specific details.
[scene]
cell_size = [20, 20]

# Define a circle that isn't alive (i.e an entity) but can pulsate.
[[scene.entities.circles]]
id = 1
description='pulsing circle'
default_radius = 20
active_radius = 20
scale = 10
location=[100, 100]
color=[0, 0, 0]
fill=[0, 0, 255]
renderer='simple_circle_renderer'
update_method='update_active_radius'

# Schedule the circle's associated task to be ran every frame.
[[scene.schedule]]
coroutine = 'pulse_circle_coroutine'
circle_id = 1
speed = 0.100

TOML isn’t a perfect solution. It helps organize a simulation’s structure but it can easily grow quite large. A proper engine would include visual tools for working with the scene files, but since this is just a prototyping environment it meets my needs.

Tasks#

All items can have logic associated with them that should be executed on some frequency. For now I’m calling this type of logic a task. A task is defined by having the following characteristics.

  • It can maintain an internal state.
  • It can subdivide work into smaller tasks.
  • It can be scheduled.
  • It can be suspended and resumed.

The scene declared above associates a task with a circle entity that will make the circle pulse its radius. This task is implemented like so.

# A task is a coroutine that can accept both positional and named parameters.
def pulse_circle_coroutine(*args, **kwargs):
  logger.info('pulse_circle: Starting task.')

  # Tasks can be initialized with data.
  scene = kwargs['scene']
  circle_id = kwargs['circle_id']
  speed: float = kwargs['speed']
  circle = scene.entities['circles'][circle_id]

  if circle:
    try:
      while True:
        # Per Frame Logic
        inflate_amount:float = 0.5*(1+sin(2 * pi * perf_counter()))
        circle.active_radius = circle.default_radius + circle.scale * inflate_amount

        # Tell the Task Scheduler to run this again next frame.
        yield ScheduleTraps.NEXT_FRAME
    except GeneratorExit:
      logger.info('Task: pulse_circle - GeneratorExit')
    finally:
      logger.info('Task: pulse_circle - Task Completed')
  else:
    raise Exception(f"Could not find circle: {circle_id}")

Coroutines are the mechanism that I use for encapsulating business logic. The coroutines themselves need to be passed around and managed. To do this I created the Task class.

@dataclass
class Task:
  # A unique ID to reference the task by.
  task_id: TaskId

  # The ID of a blocked task. Used if the task has a parent that is
  # waiting for it to finish.
  parent_id: Optional[TaskId]

  # Can be a pointer to a function or a generator that hasn't
  # been initialized.
  task_ref: Callable

  # Positional parameters for the task.
  args: List[Any]

  # Named parameters for the task.
  kwargs: Dict[str, Any]

  # The number of tasks (descendants) this task needs to
  # complete before it can be run again.
  waiting_on_count: Counter = field(
    init=False,
    default_factory=pending_task_counter
  )

  # Indicates if the task has been initialized.
  initialized: bool = field(init=False, default=False)

  # The task logic to be run.
  coroutine: Optional[Generator] = field(init=False, default=None)

  def reduce_task_dependency(self) -> None:
    self.waiting_on_count.decrement()

  def read_to_run(self) -> bool:
    return self.waiting_on_count.at_min_value()

The Task class is responsible for storing the metadata about a task. An instance of a Task isn’t very useful on its own. In order for a simulation to leverage running a task it needs the TaskScheduler.

Note
Currently tasks may only have a single parent. Time may reveal that design is too limiting. However, I’ll wait to address supporting multiple parents when I encounter a use case that requires it.

Task Scheduler#

The Task Schedule is the spiritual successor to the Job Scheduler. Similar design, a bit more complicated implementation, but ultimately easier to build simulations with.

It is basically composed of the following.

import select

# A gross simplification of the Task Scheduler.
class TaskScheduler:
  def __init__(self) -> None:
    self._registered_tasks_counter = Counter(start=0)
    self._pending_tasks_counter = Counter(start=0, increment_step=1, min_value=0)
    self._tasks_store: dict[TaskId, Task] = dict()
    self._ready_to_initialize_queue: = PollingQueue(self._initialize_task)
    self._ready_to_resume_queue: = PollingQueue(self._resume_task)
    self._hold_for_next_frame: Deque[TaskId] = deque()

  # Handles registering tasks with the scheduler.
  # Some code omitted for clarity.
  def add_task(self,
    task: Callable,
    args: List[Any] = [],
    kwargs: Dict[str, Any] = {},
    parent_id: Optional[TaskId] = None,
    priority: TaskPriority = TaskPriority.NORMAL) -> TaskId:

    # Build a proper Task and store it.
    task_id: Union[int, float] = self._registered_tasks_counter.increment()
    self._pending_tasks.increment()
    self._tasks_store[task_id] = Task(task_id, parent_id, task, args, kwargs)

    # If this new task is a dependent of an previously registered task, then
    # increase the reference counter on the parent.
    if parent_id in self._tasks_store:
      self._tasks_store[parent_id].waiting_on_count.increment()

    # Queue up the task to be worked on.
    self._ready_to_initialize_queue.append(task_id)
    return task_id

  # Handle processing items on the _ready_to_initialize_queue queue.
  def _initialize_task(self, task_id: TaskId) -> None:
    # Code omitted for clarity
    # ...

  # Handle processing items on the _ready_to_resume_queue queue.
  def _resume_task(self, task_id: TaskId) -> None:
    # Code omitted for clarity
    # ...

  # Simplified version of the consume method.
  # Responsible for processing all the tasks in a single frame.
  # Omitted error handling and profiling code for clarity.
  def consume(self):
    while not self._stopped and self._pending_tasks.value() > 0:
      # Poll both of the two queues responsible for serving up work.
      can_read, _, _ = select.select(
        [
          self._ready_to_initialize_queue,
          self._ready_to_resume_queue
        ], [], [])

      # Run the tasks that are read to run.
      for q in can_read:
        q.process_item()

  # Simplified version of the consume method.
  # Responsible for queueing up work for the next frame.
  def queue_holding_tasks(self) -> None:
    while len(self._hold_for_next_frame) > 0:
      task_id = self._hold_for_next_frame.pop()
      self._ready_to_resume_queue.append(task_id)
      self._pending_tasks.increment()

As we can see in the code above, the TaskScheduler is composed of two custom counters, a dictionary, two custom queues called PollingQueues, and a Deque.

When a simulation is loaded all of the tasks are registered with the TaskScheduler. Registering a task has the effect of storing it in the tasks_store dictionary and then adding its taskid to _ready_to_initialize_queue. Once the simulation is running, the following algorithm is performed each frame.

  1. Invoke the task scheduler’s queue_holding_tasks method. This performs the actions:

    1. Poll the hold_for_next_frame queue.
    2. If there are any tasks held from the last frame tee them up by popping from the hold_for_next_frame queue and pushing to the ready_to_resume_queue queue.
  2. Invoke the task scheduler’s consume method. This performs the actions:

    1. Poll the ready_to_initialize_queue and the ready_to_resume_queue queues.
    2. If there is a task_id in either queue process it is processed with the associated callback on the queue.
    3. For both queue callbacks, if the task should be run again in the next frame its taskid is added to the _hold_for_next_frame queue.

The two counters are used for different things.
Registered_tasks_counter always counts up and is responsible for generating an ID for the tasks when they’re registered.

Pending_tasks_counter is used for counting the number of tasks that can be run. This is used rather than constantly checking the length of the two polling queues. The _pending_tasks_counter variable can never drop below zero.

Limitations
Once the TaskScheduler starts consuming tasks from it’s queues it doesn’t stop until its done. Potentially, it could have a time limit that it can use to prevent dropped frames. I haven’t implemented that because so far, its not an issue.

Additionally, I think there may be a need for prioritizing tasks, but waiting on a use case to materialize before adding that complexity.

Conclusions#

After all that research and experimenting I’ve landed on a design that I can live with (for now).

Until next time…

  • Sam