Business#

class mobu.services.business.base.Business(*, options, user, events, logger, flock)#

Bases: Generic

Base class for monkey business (one type of repeated operation).

The basic flow for a monkey business is as follows:

  • Run startup

  • In a loop, run execute followed by idle until told to stop

  • When told to stop, run shutdown

Subclasses should override startup, execute, and shutdown to add appropriate behavior. idle by default waits for idle_time, which generally does not need to be overridden. Subclasses should also override close to free any resources allocated in __init__.

All delays should be done by calling pause, and the caller should check self.stopping and exit any loops if it is True after calling pause.

Parameters:
  • options (TypeVar(T, bound= BusinessOptions)) – Configuration options for the business.

  • user (AuthenticatedUser) – User with their authentication token to use to run the business.

  • events (Events) – Event publishers.

  • logger (BoundLogger) – Logger to use to report the results of business.

  • flock (str | None) – Flock that is running this business, if it is running in a flock.

options#

Configuration options for the business.

user#

User with their authentication token to use to run the business.

events#

Event publishers.

logger#

Logger to use to report the results of business. This will generally be attached to a file rather than the main logger.

success_count#

Number of successes.

failure_count#

Number of failures.

stopping#

Whether stop has been called and further execution should stop.

flock#

Flock that is running this business, if it is running in a flock.

name#

The name of this kind of business

Methods Summary

close()

Clean up any allocated resources.

common_event_attrs()

Attributes that are on every published event.

dump()

error_idle()

Pause after an error and before attempting to restart.

execute()

Execute the core of each business loop.

idle()

Pause at the end of each business loop.

iter_with_timeout(iterable, timeout)

Run an iterator with a timeout.

pause(interval)

Pause for up to the given interval, handling commands.

run()

Execute the core business logic.

run_once()

Execute the core business logic, only once.

shutdown()

Perform any cleanup required after stopping.

signal_refresh()

startup()

Run before the start of the first iteration and then not again.

stop()

Tell the running background task to stop and wait for that.

Methods Documentation

async close()#

Clean up any allocated resources.

This should be overridden by child classes to free any resources that were allocated in __init__.

Return type:

None

common_event_attrs()#

Attributes that are on every published event.

Return type:

CommonEventAttrs

dump()#
Return type:

BusinessData

async error_idle()#

Pause after an error and before attempting to restart.

This is called directly by Monkey rather than by the business. It happens outside of run and therefore must handle acknowledging a shutdown request.

Return type:

None

abstract async execute()#

Execute the core of each business loop.

Return type:

None

async idle()#

Pause at the end of each business loop.

Return type:

None

async iter_with_timeout(iterable, timeout)#

Run an iterator with a timeout.

Returns the next element of the iterator on success and ends the iterator on timeout or if the business was told to shut down. (The latter two can be distinguished by checking self.stopping.)

Parameters:
  • iterable (AsyncIterable[TypeVar(U)]) – Any object that supports the async iterable protocol.

  • timeout (timedelta) – How long to wait for each new iterator result.

Yields:

typing.Any – The next result of the iterable passed as iterable.

Raises:

StopIteration – Raised when the iterable is exhausted, a timeout occurs, or the business was signaled to stop by calling stop.

Return type:

AsyncGenerator[TypeVar(U)]

Notes

This is unfortunately somewhat complex because we want to read from an iterator of messages (progress for spawn or WebSocket messages for code execution) while simultaneously checking our control queue for a shutdown message and imposing a timeout.

Do this by creating two awaitables, one pause that handles the control queue and the timeout and the other that waits on the progress iterator, and then use the wait_first utility function to wait for the first one that finishes and abort the other one.

async pause(interval)#

Pause for up to the given interval, handling commands.

Parameters:
  • seconds – How long to wait.

  • interval (timedelta)

Returns:

False if the business has been told to stop, True otherwise.

Return type:

bool

async run()#

Execute the core business logic.

Calls startup, and then loops calling execute followed by idle, tracking failures by watching for exceptions and updating success_count and failure_count. When told to stop, calls shutdown followed by close.

This method is normally run in a background task.

Return type:

None

async run_once()#

Execute the core business logic, only once.

Calls startup, execute, shutdown, and close.

Return type:

None

async shutdown()#

Perform any cleanup required after stopping.

Return type:

None

signal_refresh()#
Return type:

None

async startup()#

Run before the start of the first iteration and then not again.

Return type:

None

async stop()#

Tell the running background task to stop and wait for that.

Return type:

None