Module llms_wrapper.chatbot

Module implementing chatbot functionality for LLMs.

Functions

async def run_async_example(schbot: SerialChatbot)
Expand source code
async def run_async_example(schbot: SerialChatbot):
    logger.info("\n" + "="*40 + "\n" + "--- Running Async Example ---" + "\n" + "="*40)
    chatbot = FlexibleChatbot(schbot)
    try:
        # Start the chatbot's background processing
        # Add a timeout for the start operation in case it hangs.
        await asyncio.wait_for(chatbot.start(), timeout=5)
        logger.info("Chatbot started successfully.")

        # Send messages to the chatbot (can be done from other threads/async tasks via .listen)
        logger.info("Sending initial messages in async example...")
        chatbot.listen("Hello chatbot!", dict(author="Ann", msg_id="01"))
        await asyncio.sleep(0.01) # Give loop a moment to process listen call
        chatbot.listen("Tell me a story?", dict(author="Joe", msg_id="02"))
        await asyncio.sleep(0.01)
        chatbot.listen("Another message.", dict(author="Lynn", msg_id="03"))
        await asyncio.sleep(0.01)
        chatbot.listen("And yet another message", dict(author="Lynn", msg_id="04"))
        await asyncio.sleep(0.01)
        chatbot.listen("And a message from Ann", dict(author="Ann", msg_id="05"))
        await asyncio.sleep(0.01)
        chatbot.listen("And one last message", dict(author="Ann", msg_id="06"))
        # await asyncio.sleep(0.01)
        await asyncio.sleep(0.02)


        # Consume responses using the async generator
        logger.info("--- Responses (Async) ---")
        response_count = 0
        n_ok = 0
        n_error = 0
        n_timeout = 0
        try:
            # Consume responses using the async generator interface.
            # The generator's loop condition (is_running() or not queue.empty()) handles draining during shutdown.
            async for response in chatbot.responses():
                response_count += 1
                if response:
                    logger.info(f"Received (Async): {response}")
                    if response.get("is_ok"):
                        n_ok += 1
                    else:
                        n_error += 1
                else:
                    logger.info("Received (Async): None (timeout or no response).")
                    n_timeout += 1
        except Exception as e:
            # Catch any unexpected exceptions that occur within the async for loop itself.
            logger.exception("Exception during async response consumption:")

        logger.info(f"Async consumption loop finished. Received {response_count} responses, {n_ok} ok, {n_error} errors, {n_timeout} timeouts.")

    except (ChatbotError, asyncio.TimeoutError) as e:
        logger.error(f"Failed during async example: {e}")

    finally:
        # Ensure the chatbot is stopped gracefully even if errors occurred during consumption or start.
        # Check if chatbot's internal thread was started and is alive before attempting to stop.
        if chatbot._loop_thread and chatbot._loop_thread.is_alive():
             logger.info("Async example: Chatbot thread appears to be running, attempting to stop.")
             # Add a timeout for the stop operation itself.
             try:
                 # Use asyncio.wait_for for the stop operation.
                 await asyncio.wait_for(chatbot.stop(), timeout=15) # Increased stop timeout
                 logger.info("Async example: Chatbot stopped successfully.")
             except asyncio.TimeoutError:
                 logger.error("Async example: Chatbot stop operation timed out!")
             except Exception as e:
                 logger.exception(f"Async example: Error during chatbot stop: {e}")
        else:
             logger.info("Async example: Chatbot thread was not running or stopped unexpectedly before explicit stop.")

    logger.info("--- Async Example Finished ---")
def run_sync_example(schbot: SerialChatbot)
Expand source code
def run_sync_example(schbot: SerialChatbot):
    logger.info("\n" + "="*40 + "\n" + "--- Running Sync Example ---" + "\n" + "="*40)
    chatbot = FlexibleChatbot(schbot)

    # Start the chatbot's internal async loop in its dedicated thread.
    # Since this function is synchronous, we use asyncio.run() just to await the start() method.
    # The internal loop then runs in its own thread, independent of the main thread here.
    logger.info("Starting Chatbot for sync use...")
    try:
        # Add a timeout for the start operation in case it hangs.
        asyncio.run(asyncio.wait_for(chatbot.start(), timeout=5))
        logger.info("Chatbot started successfully for sync use.")
    except (ChatbotError, asyncio.TimeoutError) as e:
        logger.error(f"Failed to start chatbot for sync use: {e}")
        # Attempt to clean up if thread was started but start failed.
        # Need temporary async context again to join thread using asyncio.to_thread.
        if chatbot._loop_thread and chatbot._loop_thread.is_alive():
             logger.warning("Attempting to join thread after failed start.")
             try:
                 asyncio.run(asyncio.wait_for(asyncio.to_thread(chatbot._loop_thread.join, timeout=1), timeout=1))
                 logger.debug("Thread joined after failed start.")
             except Exception as join_e:
                 logger.exception(f"Error joining thread after failed start: {join_e}")

        return # Cannot proceed if start failed


    n_ok = 0
    n_nok = 0
    n_timeouts = 0
    chatbot.listen("01 Hello chatbot!", dict(author="Ann", msg_id="01"))
    # wait for the answer immediately
    try:
        response = chatbot.get_next_response(timeout=4.0)  # Timeout per get attempt
        if response is not None:
            logger.info(f"Received (Sync) for first: {response}")
            n_ok += 1
        else:
            logger.info("WEIRD: Received (Sync) for first: None (timeout after 4.0s).")
            n_nok += 1
            n_timeouts += 1
    except:
        logger.exception("Error during get_next_response in sync example for answer to first question!")
        n_nok += 1

    # now send all the other messages quickly, then retrieve all the answers
    chatbot.listen("02 Tell me a story?", dict(author="Joe", msg_id="02"))
    chatbot.listen("03 Another message.", dict(author="Lynn", msg_id="03"))
    chatbot.listen("04 And yet another message", dict(msg_id="04"))
    chatbot.listen("05 And a message from Ann", dict(author="Ann", msg_id="05"))
    chatbot.listen("06 And one last message", dict(author="Ann", msg_id="06"))
    time.sleep(20)
    while True:
        try:
            response = chatbot.get_next_response(timeout=2.0)  # Timeout per get attempt
            if response is not None:
                logger.info(f"Received (Sync): {response}")
                n_ok += 1
                n_timeouts = 0
            else:
                logger.info(f"Received (Sync): None {n_timeouts} (timeout after 2.0s).")
                n_timeouts += 1
                if n_timeouts > 10:
                    logger.info("!!!!Stopping sync consumption after 20 timeouts.")
                    break
        except Exception as e:
            logger.exception(f"Error during get_next_response in sync example for answer to other questions: {e}")
            break
    logger.info(f"============ Sync consumption loop finished. Received {n_ok} responses, {n_nok} errors, {n_timeouts} timeouts.")
    logger.info(f"Sync consumption loop finished.")

    # Ensure the chatbot is stopped after consumption finishes
    logger.info("Stopping Chatbot after sync use.")
    # Check if the chatbot thread is still alive before attempting to stop
    if chatbot._loop_thread and chatbot._loop_thread.is_alive():
        try:
            # Add a timeout for the stop operation itself to prevent hangs.
            # Use asyncio.run() just to await the stop() operation within this sync function.
            # Increased stop timeout to 15s as the process includes waiting for tasks/queues.
            asyncio.run(asyncio.wait_for(chatbot.stop(), timeout=15))
            logger.debug("Chatbot stopped after sync use.")
        except asyncio.TimeoutError:
            logger.error("Chatbot stop operation timed out!")
        except Exception as e:
            logger.exception(f"Error during chatbot stop for sync use: {e}")
    else:
         logger.warning("Chatbot thread was not alive when stop was called.")

Classes

class ChatbotError (*args, **kwargs)
Expand source code
class ChatbotError(Exception):
    """Custom exception for chatbot errors."""
    pass

Custom exception for chatbot errors.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class FlexibleChatbot (serial_chatbot: SerialChatbot)
Expand source code
class FlexibleChatbot:
    """
    A chatbot class that processes messages asynchronously in a background thread
    and allows retrieving responses via either an async generator or a synchronous
    blocking method.
    """
    def __init__(
            self,
            serial_chatbot: SerialChatbot,
    ):
        assert serial_chatbot is not None , "SerialChatbot instance must be provided."
        assert isinstance(serial_chatbot, SerialChatbot), "serial_chatbot must be an instance of SerialChatbot."
        self.chatbot = serial_chatbot

        # Queues for communication between the listening thread and the async loop thread.
        # asyncio.Queue is used because the processing loop runs on an asyncio loop.
        self._incoming_queue = asyncio.Queue()
        self._outgoing_queue = asyncio.Queue()
        # self._incoming_queue = LoggingQueue()
        # self._outgoing_queue = LoggingQueue()

        # Event to signal the processing loop to stop. An asyncio.Event is needed
        # because the processing loop is async.
        self._stop_event = asyncio.Event()

        # Thread to run the asyncio event loop in the background.
        self._loop_thread: threading.Thread | None = None
        # The asyncio event loop instance running in _loop_thread.
        self._event_loop: asyncio.AbstractEventLoop | None = None

        # The main asyncio task running the message processing logic.
        self._processing_task: asyncio.Task | None = None

        # Flag to indicate if the chatbot is intended to be running.
        self._running = False

        # Lock for the synchronous get_next_response method to prevent
        # multiple threads from trying to interact with the async loop simultaneously
        # via this method.
        self._get_response_lock = threading.Lock()

    def chatbot_implementation(self, message: str, metadata: Optional[Dict[str,str]] = None):
        # return "Standard response to message: " + message
        return self.chatbot.reply(message, metadata=metadata)

    def _run_loop_in_thread(self):
        """
        Target function for the dedicated background thread.
        It creates and runs the asyncio event loop.
        """
        logger.debug("Asyncio loop thread starting.")
        # Create a new event loop specifically for this thread.
        self._event_loop = asyncio.new_event_loop()
        # Set this loop as the current loop for this thread.
        asyncio.set_event_loop(self._event_loop)

        try:
            self._processing_task = self._event_loop.create_task(self._process_messages_loop())
            logger.debug("Asyncio loop running until stop event...")
            # The loop waits here. Execution continues after stop_event is set.
            self._event_loop.run_until_complete(self._stop_event.wait())
            logger.debug("Asyncio stop event received. Initiating shutdown sequence in loop thread.")

            # --- Graceful Shutdown Phase in Loop Thread ---
            # 1. Wait for the main processing task to finish (it exits its loop after stop event).
            #    This ensures messages currently being processed or already in the queue when
            #    stop was signalled get handled by _process_messages_loop's draining logic.
            logger.debug("Waiting for processing task to finish after stop signal...")
            try:
                # Use gather with return_exceptions=True to ensure we don't fail here
                # if the task raises an error during its final moments.
                # Add a timeout for the processing task to complete its final cycle(s).
                self._event_loop.run_until_complete(asyncio.gather(asyncio.wait_for(self._processing_task, timeout=5.0), return_exceptions=True))
                logger.debug("Processing task finished.")
            except asyncio.TimeoutError:
                 logger.warning("Processing task did not finish within timeout after stop signal.")
                 # If it didn't finish, try to cancel it as a fallback.
                 if self._processing_task and not self._processing_task.done():
                     logger.warning("Cancelling processing task...")
                     try:
                         self._processing_task.cancel()
                         self._event_loop.run_until_complete(asyncio.gather(self._processing_task, return_exceptions=True))
                         logger.debug("Processing task cancellation handled.")
                     except asyncio.CancelledError:
                         logger.debug("Processing task cancellation handled.")
                     except Exception as e:
                         logger.exception(f"Error during processing task cancellation wait: {e}")
                 else:
                      logger.warning("Processing task was already done but wait_for timed out?")
            except Exception as e:
                 logger.exception(f"Error waiting for processing task to finish: {e}")

            # 2. Wait for outgoing queue to be fully consumed (task_done called for all items).
            #    This helps ensure that Queue.get calls from consumers have completed their
            #    successful path (getting an item and calling task_done) *before* we start
            #    cancelling pending gets.
            logger.debug("Waiting for outgoing queue to drain (max 2s)...")
            try:
                # Use join() with a timeout. If consumers (async generator or sync getter)
                # haven't called task_done for all items produced before shutdown,
                # this will wait up to the timeout.
                # Use wait_for to prevent join() from blocking indefinitely if task_done isn't called.
                self._event_loop.run_until_complete(asyncio.wait_for(self._outgoing_queue.join(), timeout=2.0))
                logger.debug("Outgoing queue drained.")
            except asyncio.TimeoutError:
                 logger.warning("Outgoing queue did not drain within timeout.")
            except Exception as e:
                 logger.exception(f"Error waiting for outgoing queue to drain: {e}")


            # 3. Cancel any remaining tasks on this loop. This is crucial to clean up
            #    tasks created by run_coroutine_threadsafe (like pending queue.get calls
            #    from the synchronous get_next_response method) that timed out or
            #    were still waiting when shutdown began.
            logger.debug("Cancelling remaining tasks on the event loop...")
            # Get all tasks associated with this event loop.
            pending_tasks = asyncio.all_tasks(self._event_loop)
            # Filter out the task running _run_loop_in_thread itself (which is done)
            # and the processing task (which should be done or cancelling).
            # We want to cancel other tasks, typically those waiting on queues/futures.
            tasks_to_cancel = [task for task in pending_tasks if task is not asyncio.current_task(self._event_loop) and not task.done()]

            if tasks_to_cancel:
                logger.debug(f"Cancelling {len(tasks_to_cancel)} pending tasks.")
                # Cancel tasks.
                for task in tasks_to_cancel:
                    task.cancel()
                # Wait for cancellation to complete for these tasks.
                # Use gather with return_exceptions=True to handle potential
                # CancelledError or other exceptions during cancellation waiting.
                # Add a timeout here too, in case cancellation gets stuck.
                try:
                    self._event_loop.run_until_complete(asyncio.gather(*tasks_to_cancel, return_exceptions=True))
                    logger.debug("Pending tasks cancellation and waiting complete.")
                except Exception as e:
                     logger.exception(f"Error waiting for pending tasks cancellation: {e}")
            else:
                logger.debug("No pending tasks found to cancel.")


            # 4. Add a small delay to allow any final cleanup logic (like in cancelled tasks) to run
            #    before closing the loop.
            logger.debug("Giving a small moment for final cleanup...")
            try:
                 # Run a very short sleep on the loop.
                 self._event_loop.run_until_complete(asyncio.sleep(0.05)) # Slightly increased sleep
            except Exception: # Catch potential errors if loop is already stopping/closed
                 pass


        except Exception as e:
             # Catch any exceptions that happened *during* the shutdown sequence itself.
             logger.exception(f"Exception during asyncio loop thread shutdown sequence: {e}")
        finally:
            # 5. Close the loop. This should happen only after tasks are cancelled and awaited.
            logger.debug("Closing asyncio event loop.")
            # Check if loop is still open before closing
            if self._event_loop and not self._event_loop.is_closed():
                try:
                    self._event_loop.close()
                    logger.debug("Asyncio event loop closed.")
                except Exception as e:
                    logger.exception(f"Error closing event loop: {e}")
            else:
                logger.warning("Attempted to close loop, but it was already None or closed.")

            # Clear references.
            asyncio.set_event_loop(None) # Unset loop for this thread
            self._event_loop = None
            self._processing_task = None # Ensure this is None
            logger.debug("Asyncio loop thread finished.")

    async def _process_messages_loop(self):
        """
        Internal asyncio task coroutine that processes messages from the incoming queue.
        Handles errors during processing and puts responses/errors onto the outgoing queue.
        Gracefully drains the incoming queue during shutdown.
        """
        logger.debug("Chatbot processing loop started.")
        # Loop as long as the stop event is not set OR there are items left in the incoming queue
        # to process. This allows draining the queue during shutdown.
        while not self._stop_event.is_set() or not self._incoming_queue.empty():
            try:
                # Get message:
                # If the stop event is NOT set, use await get() with a timeout.
                # If the stop event IS set, use get_nowait() to quickly process any remaining
                # items in the queue without blocking indefinitely. get_nowait() will raise
                # QueueEmpty when the queue is fully drained during shutdown, allowing the
                # loop to exit gracefully.
                if not self._stop_event.is_set():
                     # Wait for the next message while not stopping.
                     # Use a small timeout to allow loop to check stop_event and do periodic tasks.
                     # This also allows the loop to respond to stop signals promptly if the queue is empty.
                     message, metadata = await asyncio.wait_for(self._incoming_queue.get(), timeout=0.1)
                     logger.debug(f"Chatbot received: '{message}', metadata {metadata}'")
                else:
                     # If stopping, try to get existing messages without waiting.
                     # This will raise QueueEmpty when the queue is drained.
                     message, metadata = self._incoming_queue.get_nowait()
                     logger.debug(f"Chatbot processing remaining queued: '{message}',  metadata {metadata}'")

                # --- Chatbot Logic for processing a single message ---
                try:

                    response = dict(answer=None, error="DUMMY RESPONSE", is_ok=False, message=[], metadata=None)
                    try:
                        # Use asyncio.to_thread to run the blocking simulation in a separate thread.
                        # Errors from _simulate_blocking_io will be propagated by await.
                        response = await asyncio.to_thread(self.chatbot_implementation, message, metadata)
                    except Exception as io_e:
                        logger.exception(f"Error during offloaded I/O for '{message}': {io_e}")
                        # On I/O error during processing, put an error message onto the outgoing queue.
                        response["error"] = f"Error processing question: {io_e}"
                        response["is_ok"] = False

                    if response["is_ok"] and response["answer"] is None:  # the chatbot explicitly didn't give an answer
                        pass
                    else:
                        logger.debug(f"Chatbot generated response: {response}")
                        # Put the generated response (or error message) onto the outgoing queue.
                        await self._outgoing_queue.put(response)
                        logger.debug(f">>>>>>>>>>>>> Queue size is now {self._outgoing_queue.qsize()} after putting response.")

                except Exception as processing_e:
                    # Handle exceptions that occur *during* the processing of a single message.
                    logger.exception(f"Error processing message '{message}': {processing_e}")
                    # Put an error indicator/message onto the outgoing queue so consumers can see it.
                    await self._outgoing_queue.put(dict(answer=None, error=str(processing_e), is_ok=False, message=message, metadata=metadata))
                    logger.debug(f"Queue size is now {self._outgoing_queue.qsize()} after putting response.")
                finally:
                    # Ensure task_done is called for the item retrieved from the incoming queue.
                    # This is crucial if asyncio.Queue.join() is used elsewhere to wait for processing completion.
                    self._incoming_queue.task_done()

            except asyncio.TimeoutError:
                # await get(timeout) expired. This happens when not stopping and the incoming queue is empty.
                # The loop condition allows us to just continue and check again.
                # This is where periodic actions would be triggered if needed.
                # logger.debug("Processing loop timed out waiting for message.")
                pass
            except asyncio.QueueEmpty:
                 # get_nowait() raised QueueEmpty. This should only happen when stopping
                 # (because we use await get() when not stopping). When it happens,
                 # it means the incoming queue is fully drained.
                 logger.debug("Incoming queue empty during stopping phase.")
                 # The while loop condition will be checked next. If stop_event is set,
                 # the loop will correctly terminate because the queue is also empty.

            except Exception as e:
                 # Catch any unexpected exceptions in the outer loop (e.g., issues with get_nowait, etc.).
                 logger.exception(f"Unexpected error in processing loop outer try block: {e}")
                 # If a critical error occurs that indicates the loop is broken, set the stop event
                 # to signal shutdown.
                 # self._stop_event.set() # Signal stop
                 # Add a small sleep here to prevent a tight error loop if the exception is continuous.
                 await asyncio.sleep(0.1)
                 # Depending on the severity, you might break the loop immediately:
                 # break

        logger.debug("Chatbot processing loop finished.")


    # --- Public Interface ---

    def listen(self, message: str, metadata: Optional[Dict[str,str]] = None):
        """
        Notify the chatbot about a new message. Can be called from any thread.
        Messages are queued for asynchronous processing. Non-blocking.
        """
        # Check if the chatbot's background loop is running and ready to receive messages.
        if not self.is_running() or self._event_loop is None or self._event_loop.is_closed():
            logger.warning(f"Chatbot not running or loop not ready. Message lost: '{message}', metadata {metadata}")
            # A more advanced version could buffer messages internally here until start() is called.
            return

        try:
            # Use call_soon_threadsafe to safely schedule the put operation onto the
            # chatbot's internal event loop thread. This is necessary because
            # asyncio.Queue methods are not thread-safe for calls from other threads.
            # put_nowait is used because listen() should not block the caller thread.
            # If the queue is full, QueueFull will be raised *in the loop thread*,
            # which should ideally be handled there (e.g., by logger.a warning).
            self._event_loop.call_soon_threadsafe(
                self._incoming_queue.put_nowait, (message, metadata)
            )
            logger.debug(f"Message from {message}/{metadata} successfully queued.")

        except Exception as e:
             # Catch potential errors from call_soon_threadsafe itself (e.g., loop unexpectedly closed)
             logger.exception(f"Failed to enqueue message {message}/{metadata} via call_soon_threadsafe: {e}")
             # The message might be lost here depending on error handling design choice.


    async def start(self):
        """
        Starts the internal chatbot processing thread and asyncio event loop.
        This method must be awaited and called from an asyncio context.
        """
        if self._running:
            logger.warning("Chatbot is already running.")
            return

        logger.debug("Starting Chatbot...")
        # Reset event and flag before starting the thread.
        self._stop_event.clear()
        self._running = True

        # Create and start the thread that will run the asyncio loop.
        # Use daemon=True so the thread doesn't prevent the main program from exiting
        # if the main thread finishes before the chatbot is explicitly stopped.
        self._loop_thread = threading.Thread(target=self._run_loop_in_thread, daemon=True)
        self._loop_thread.start()

        # --- Wait for Loop Initialization ---
        # Wait until the internal loop and processing task are initialized and ready.
        # Add a timeout for this wait itself to prevent hangs during startup.
        try:
            await asyncio.wait_for(self._wait_for_ready(), timeout=5.0) # Wait for internal signal
        except asyncio.TimeoutError:
             self._running = False
             logger.error("Chatbot failed to signal readiness within timeout.")
             # Attempt to join thread if it started but isn't ready
             if self._loop_thread and self._loop_thread.is_alive():
                 logger.warning("Attempting to join thread after failed start.")
                 # Need temporary async context again to join thread using asyncio.to_thread
                 try:
                     asyncio.run(asyncio.wait_for(asyncio.to_thread(self._loop_thread.join, timeout=1), timeout=1))
                     logger.debug("Thread joined after failed start.")
                 except Exception as join_e:
                     logger.exception(f"Error joining thread after failed start: {join_e}")

             self._loop_thread = None
             self._event_loop = None # Ensure state is clean
             self._processing_task = None
             raise ChatbotError("Failed to initialize internal processing within timeout.") from e

        # Final check after waiting
        # This check handles cases where _wait_for_ready completed but initialization failed immediately after.
        if self._event_loop is None or self._event_loop.is_closed() or \
           self._processing_task is None or self._processing_task.done():
             self._running = False # Mark as not running if initialization failed
             logger.error("Failed to initialize internal asyncio loop or processing task after wait.")
             # Attempt to join the thread to clean up resources if it potentially failed immediately.
             if self._loop_thread and self._loop_thread.is_alive():
                 self._loop_thread.join(timeout=1) # Don't block the calling start indefinitely
             self._loop_thread = None
             self._event_loop = None # Ensure state is clean
             self._processing_task = None
             # Raise a specific error to indicate start failure.
             raise ChatbotError("Failed to initialize internal processing.")

        logger.debug("Chatbot started successfully.")

    async def _wait_for_ready(self):
        """Internal coroutine to wait until the loop and task are initialized by the thread."""
        # Wait until event_loop and processing_task are set by _run_loop_in_thread
        # Also check if the task is actually running (not immediately done/error).
        while self._event_loop is None or self._processing_task is None or self._processing_task.done():
            # Need to yield control to the event loop
            await asyncio.sleep(0.01) # Sleep briefly


    async def stop(self):
        """
        Signals the internal chatbot processing to stop gracefully.
        This method must be awaited and called from an asyncio context.
        It waits for the internal processing thread to finish.
        """
        # Check if the chatbot is in a state that can be stopped.
        # If not running but thread is alive, still try to signal and join.
        if not self._running and (self._loop_thread is None or not self._loop_thread.is_alive()):
            logger.warning("Chatbot is not running and thread is not active.")
            # Clean up state just in case it's in an inconsistent state.
            self._running = False
            self._loop_thread = None
            self._event_loop = None
            self._processing_task = None
            return

        logger.debug("Stopping Chatbot...")
        # Set the running flag to False immediately.
        self._running = False

        # Signal the processing loop to stop by setting the asyncio event.
        # This must be called thread-safely onto the internal loop thread.
        if self._event_loop and not self._event_loop.is_closed():
             try:
                # Schedule the event setting on the loop thread.
                self._event_loop.call_soon_threadsafe(self._stop_event.set)
                logger.debug("Stop event signalled thread-safely.")
             except Exception as e:
                 logger.exception(f"Error signalling stop event thread-safely: {e}")
                 # Even if signalling fails, proceed with trying to join the thread.
        else:
             logger.warning("Event loop not available or closed during stop signal. Cannot signal stop event.")
             # If the loop is gone, the thread might be exiting already, but cleanup might be missed.


        # Wait for the thread running the loop to join.
        # Use asyncio.to_thread to run the blocking .join() call in a separate thread
        # managed by asyncio, so it doesn't block the async loop calling stop().
        logger.debug("Waiting for internal loop thread to join...")
        # Only attempt to join if the thread was actually started.
        if self._loop_thread is not None:
            try:
                # Set a timeout for joining the thread in case it's stuck (e.g., in blocking I/O).
                # This prevents the stop() method from hanging indefinitely.
                await asyncio.to_thread(self._loop_thread.join, timeout=10.0)

                # Check if the thread successfully joined or if the join timed out.
                if self._loop_thread and self._loop_thread.is_alive():
                     logger.error("Internal loop thread did not join within timeout!")
                     # Depending on requirements, you might log this and continue,
                     # or attempt more drastic measures (less recommended in libraries).
                else:
                     logger.debug("Internal loop thread joined successfully.")

            except Exception as e:
                 # Catch any exceptions during the asyncio.to_thread or join operation.
                 logger.exception(f"Error waiting for internal loop thread to join: {e}")

        # Ensure state is completely reset regardless of join success/failure.
        self._loop_thread = None
        self._event_loop = None # Should be None after close in thread (or potentially None if join timed out)
        self._processing_task = None # Should be None after thread exits or cancellation

        logger.debug("Chatbot stopped.")


    def is_running(self) -> bool:
        """
        Returns True if the chatbot's internal processing is intended to be active
        and the background thread is alive.
        """
        # Check the _running flag and the thread status for a more robust check.
        return self._running and self._loop_thread is not None and self._loop_thread.is_alive()



    # Async generator to yield responses from the chatbot.
    async def responses(self, max_timeout: float = 5.0):
        """
        An async generator that yields responses from the chatbot as they are ready.
        Consume this using 'async for'. Requires an asyncio event loop to be running
        in the context where this generator is iterated.

        The max_timeout parameter specifies the maximum accumulated time to wait for a response before yielding None.
        """
        # Check if the chatbot is running before starting consumption.
        # Allow consumption if not running but queues aren't empty, to drain remaining items.
        if not self.is_running() and self._outgoing_queue.empty():
            logger.warning("Chatbot not running and outgoing queue is empty, responses generator yielding nothing.")
            return # Generator immediately stops if not running and nothing to drain.

        logger.debug("Async responses generator started.")
        # Continue yielding as long as the chatbot is running OR there are items
        # left in the outgoing queue (to drain messages processed during shutdown).
        last_response_time = time.time()
        while self.is_running() or not self._outgoing_queue.empty():
             try:
                # Wait for a response with a short timeout.
                # The timeout allows the loop condition (self.is_running() and queue.empty()) to be checked
                # and prevents the generator from blocking indefinitely after stop is signalled
                # and the queue is empty.
                # If the internal loop is closing, get() might raise CancelledError, handled below.

                response = await asyncio.wait_for(self._outgoing_queue.get(), timeout=0.1) # Small timeout
                last_response_time = time.time() # Update last response time

                logger.debug(f"Async generator yielding response: {response}")
                yield response
                # Signal that the item has been consumed from the queue.
                # This must be done on the same loop the queue belongs to, which is the current loop here.
                self._outgoing_queue.task_done()

             except asyncio.TimeoutError:
                 # This exception occurs when get(timeout) expires before an item is available.
                 # The loop condition (while self.is_running() or not self._outgoing_queue.empty())
                 # will be checked next.
                 # logger.debug("Responses generator timed out waiting for response.")

                 # if the time since the last response is greater than max_timeout, return None
                 if time.time() - last_response_time > max_timeout:
                    logger.debug("Responses generator timed out waiting for response.")
                    # Return None to indicate no response was received within the timeout.
                    # This allows the consumer to handle the timeout case.
                    yield None
                    break
             except asyncio.CancelledError:
                 # This can happen if the internal loop is stopping/closing and cancels pending gets.
                 logger.debug("Async responses generator cancelled during get.")
                 # Treat as graceful shutdown, exit the generator.
                 break
             except Exception as e:
                # Catch any other exceptions during queue retrieval or yielding.
                logger.exception(f"Error in async responses generator while getting/yielding: {e}")
                # Decide how to handle error - yield an error message or break the generator?
                # Yielding an error message allows the consumer to handle it explicitly.
                yield {"type": "generator_error", "content": f"Error retrieving response: {e}"}
                # If the error is critical or unrecoverable for the generator, uncomment break:
                # break # Exit the generator loop on error.

        logger.debug("Async responses generator finished.")


    # synchronous blocking read but with timeout
    def get_next_response(self, timeout: float | None = None):
        """
        Retrieves the next response, blocking until a response is available
        or the optional timeout occurs. Returns the response or None on timeout.
        If the response can also be None, there is no way to distinguish between the two.
        Can be called from any synchronous thread, provided start() has been called.
        Raises ChatbotError if the chatbot is not running and the queue is empty/loop unavailable.
        Returns None on timeout or if the chatbot is stopping and the queue becomes empty.
        """
        # Acquire lock to ensure only one thread calls this method at a time (optional but safer)
        with self._get_response_lock:
            # Check if the chatbot's background loop is running and ready.
            # If not running but the outgoing queue is NOT empty, we still allow
            # attempting to get messages to drain the queue during sync shutdown.
            # If not running AND queue is empty, there's nothing to get, return None immediately.
            if not self.is_running() and self._outgoing_queue.empty():
                 logger.warning("Sync getter: Not running and queue empty, returning None.")
                 return None

            # If not running but queue is NOT empty, or if running, we need the event loop.
            # Check if the event loop is available before trying to interact with it.
            if self._event_loop is None or self._event_loop.is_closed():
                 # This state might be reached if stop() is called and the loop thread is
                 # in the process of tearing down but the queue isn't quite empty yet.
                 # Log a warning and return None, as we cannot reliably interact with the loop.
                 logger.warning("Sync getter: Event loop is not available or closed while trying to get item.")
                 # Return None, as the consumer should check is_running() or timeout.
                 return None


            # If we reach here, either running OR not running but queue has items, AND loop is available.
            # Proceed to try and get from the queue via run_coroutine_threadsafe.
            try:
                # We need to run an async coroutine (self._outgoing_queue.get()) on the
                # chatbot's internal event loop from this external synchronous thread.
                # asyncio.run_coroutine_threadsafe is the standard and safe tool for this.
                # Note: asyncio.run_coroutine_threadsafe can raise RuntimeError if the loop is closed
                # or potentially other issues if called during a messy shutdown.

                logger.debug(f">>>>>> Sync getter: Attempting to get item from outgoing queue, size is {self._outgoing_queue.qsize()}")
                coro = self._outgoing_queue.get()
                future = asyncio.run_coroutine_threadsafe(coro, self._event_loop)

                # Wait for the future to complete in this thread (the calling synchronous thread),
                # with the specified timeout. This call blocks the current thread.
                # This can raise concurrent.futures.TimeoutError or exceptions propagated
                # from the coroutine (like exceptions put into the queue as error messages)
                # or concurrent.futures.CancelledError if the internal loop cancels the task.
                response = future.result(timeout=timeout)
                logger.debug(f">>>>>> DEBUG: internal Sync getter got response: {response}")

                # If we reached here, the .get() operation on the async queue was successful
                # and returned an item (response or error dict).
                # Now, signal task_done() back on the event loop thread, thread-safely.
                # Use call_soon_threadsafe to schedule the task_done operation.
                # Add check for event loop availability again before scheduling.
                if self._event_loop and not self._event_loop.is_closed():
                    try:
                        self._event_loop.call_soon_threadsafe(self._outgoing_queue.task_done)
                    except Exception as e:
                         # Log error if task_done cannot be scheduled (e.g., loop closed just now)
                         logger.exception(f"Error calling task_done thread-safely for {response}: {e}")
                         # Decide if this indicates an unhandled item or just a late cleanup message.
                         # For now, log and proceed.
                else:
                     logger.warning(f"Event loop not available to call task_done for {response}. Item might not be marked as done.")

                logger.debug(f"Sync getter retrieved response: {response}")
                return response

            except concurrent.futures.TimeoutError:
                 # This exception occurs if future.result(timeout=...) expires.
                 # It means no item was available in the async queue within the timeout.
                 # This is a normal occurrence when the queue is empty.
                 # logger.debug("Sync getter timed out waiting for response.")
                 logger.debug(f"DEBUG: internal Sync getter got TimeoutError exception")
                 return None # Standard behavior for blocking get with timeout is returning None.

            except concurrent.futures.CancelledError:
                 # This happens if the future/task was cancelled while waiting.
                 # This occurs when the internal asyncio loop is stopping/closing
                 # and our cleanup logic explicitly cancels pending tasks.
                 logger.debug("Sync get operation was cancelled (internal loop shutting down?).")
                 # Treat this as a non-critical event indicating shutdown or no more items available.
                 # Return None to allow consumption loops to check if chatbot is still running.
                 return None

            except RuntimeError as e:
                 # Catch RuntimeErrors, specifically 'Event loop is closed', which can happen
                 # if run_coroutine_threadsafe is called right as the loop is closing,
                 # or if a future completes after the loop is closed and tries cleanup.
                 if "Event loop is closed" in str(e):
                     logger.warning("Sync get called or completed when event loop is closed.")
                     # Return None as the loop is gone and no more items are expected via this path.
                     return None
                 else:
                      logger.exception(f"Unexpected RuntimeError in sync get: {e}")
                      # For other RuntimeErrors, re-raise as a ChatbotError
                      raise ChatbotError(f"Unexpected internal error during retrieval: {e}") from e

            except Exception as e:
                # Catch any other unexpected exceptions during future.result retrieval or
                # exceptions propagated from the coroutine itself (like processing errors
                # if the consumer code didn't handle the dict).
                logger.exception(f"Error in synchronous get_next_response during future result: {e}")
                # Decide how to handle - re-raise as a ChatbotError
                raise ChatbotError(f"Error retrieving response: {e}") from e

A chatbot class that processes messages asynchronously in a background thread and allows retrieving responses via either an async generator or a synchronous blocking method.

Methods

def chatbot_implementation(self, message: str, metadata: Dict[str, str] | None = None)
Expand source code
def chatbot_implementation(self, message: str, metadata: Optional[Dict[str,str]] = None):
    # return "Standard response to message: " + message
    return self.chatbot.reply(message, metadata=metadata)
def get_next_response(self, timeout: float | None = None)
Expand source code
def get_next_response(self, timeout: float | None = None):
    """
    Retrieves the next response, blocking until a response is available
    or the optional timeout occurs. Returns the response or None on timeout.
    If the response can also be None, there is no way to distinguish between the two.
    Can be called from any synchronous thread, provided start() has been called.
    Raises ChatbotError if the chatbot is not running and the queue is empty/loop unavailable.
    Returns None on timeout or if the chatbot is stopping and the queue becomes empty.
    """
    # Acquire lock to ensure only one thread calls this method at a time (optional but safer)
    with self._get_response_lock:
        # Check if the chatbot's background loop is running and ready.
        # If not running but the outgoing queue is NOT empty, we still allow
        # attempting to get messages to drain the queue during sync shutdown.
        # If not running AND queue is empty, there's nothing to get, return None immediately.
        if not self.is_running() and self._outgoing_queue.empty():
             logger.warning("Sync getter: Not running and queue empty, returning None.")
             return None

        # If not running but queue is NOT empty, or if running, we need the event loop.
        # Check if the event loop is available before trying to interact with it.
        if self._event_loop is None or self._event_loop.is_closed():
             # This state might be reached if stop() is called and the loop thread is
             # in the process of tearing down but the queue isn't quite empty yet.
             # Log a warning and return None, as we cannot reliably interact with the loop.
             logger.warning("Sync getter: Event loop is not available or closed while trying to get item.")
             # Return None, as the consumer should check is_running() or timeout.
             return None


        # If we reach here, either running OR not running but queue has items, AND loop is available.
        # Proceed to try and get from the queue via run_coroutine_threadsafe.
        try:
            # We need to run an async coroutine (self._outgoing_queue.get()) on the
            # chatbot's internal event loop from this external synchronous thread.
            # asyncio.run_coroutine_threadsafe is the standard and safe tool for this.
            # Note: asyncio.run_coroutine_threadsafe can raise RuntimeError if the loop is closed
            # or potentially other issues if called during a messy shutdown.

            logger.debug(f">>>>>> Sync getter: Attempting to get item from outgoing queue, size is {self._outgoing_queue.qsize()}")
            coro = self._outgoing_queue.get()
            future = asyncio.run_coroutine_threadsafe(coro, self._event_loop)

            # Wait for the future to complete in this thread (the calling synchronous thread),
            # with the specified timeout. This call blocks the current thread.
            # This can raise concurrent.futures.TimeoutError or exceptions propagated
            # from the coroutine (like exceptions put into the queue as error messages)
            # or concurrent.futures.CancelledError if the internal loop cancels the task.
            response = future.result(timeout=timeout)
            logger.debug(f">>>>>> DEBUG: internal Sync getter got response: {response}")

            # If we reached here, the .get() operation on the async queue was successful
            # and returned an item (response or error dict).
            # Now, signal task_done() back on the event loop thread, thread-safely.
            # Use call_soon_threadsafe to schedule the task_done operation.
            # Add check for event loop availability again before scheduling.
            if self._event_loop and not self._event_loop.is_closed():
                try:
                    self._event_loop.call_soon_threadsafe(self._outgoing_queue.task_done)
                except Exception as e:
                     # Log error if task_done cannot be scheduled (e.g., loop closed just now)
                     logger.exception(f"Error calling task_done thread-safely for {response}: {e}")
                     # Decide if this indicates an unhandled item or just a late cleanup message.
                     # For now, log and proceed.
            else:
                 logger.warning(f"Event loop not available to call task_done for {response}. Item might not be marked as done.")

            logger.debug(f"Sync getter retrieved response: {response}")
            return response

        except concurrent.futures.TimeoutError:
             # This exception occurs if future.result(timeout=...) expires.
             # It means no item was available in the async queue within the timeout.
             # This is a normal occurrence when the queue is empty.
             # logger.debug("Sync getter timed out waiting for response.")
             logger.debug(f"DEBUG: internal Sync getter got TimeoutError exception")
             return None # Standard behavior for blocking get with timeout is returning None.

        except concurrent.futures.CancelledError:
             # This happens if the future/task was cancelled while waiting.
             # This occurs when the internal asyncio loop is stopping/closing
             # and our cleanup logic explicitly cancels pending tasks.
             logger.debug("Sync get operation was cancelled (internal loop shutting down?).")
             # Treat this as a non-critical event indicating shutdown or no more items available.
             # Return None to allow consumption loops to check if chatbot is still running.
             return None

        except RuntimeError as e:
             # Catch RuntimeErrors, specifically 'Event loop is closed', which can happen
             # if run_coroutine_threadsafe is called right as the loop is closing,
             # or if a future completes after the loop is closed and tries cleanup.
             if "Event loop is closed" in str(e):
                 logger.warning("Sync get called or completed when event loop is closed.")
                 # Return None as the loop is gone and no more items are expected via this path.
                 return None
             else:
                  logger.exception(f"Unexpected RuntimeError in sync get: {e}")
                  # For other RuntimeErrors, re-raise as a ChatbotError
                  raise ChatbotError(f"Unexpected internal error during retrieval: {e}") from e

        except Exception as e:
            # Catch any other unexpected exceptions during future.result retrieval or
            # exceptions propagated from the coroutine itself (like processing errors
            # if the consumer code didn't handle the dict).
            logger.exception(f"Error in synchronous get_next_response during future result: {e}")
            # Decide how to handle - re-raise as a ChatbotError
            raise ChatbotError(f"Error retrieving response: {e}") from e

Retrieves the next response, blocking until a response is available or the optional timeout occurs. Returns the response or None on timeout. If the response can also be None, there is no way to distinguish between the two. Can be called from any synchronous thread, provided start() has been called. Raises ChatbotError if the chatbot is not running and the queue is empty/loop unavailable. Returns None on timeout or if the chatbot is stopping and the queue becomes empty.

def is_running(self) ‑> bool
Expand source code
def is_running(self) -> bool:
    """
    Returns True if the chatbot's internal processing is intended to be active
    and the background thread is alive.
    """
    # Check the _running flag and the thread status for a more robust check.
    return self._running and self._loop_thread is not None and self._loop_thread.is_alive()

Returns True if the chatbot's internal processing is intended to be active and the background thread is alive.

def listen(self, message: str, metadata: Dict[str, str] | None = None)
Expand source code
def listen(self, message: str, metadata: Optional[Dict[str,str]] = None):
    """
    Notify the chatbot about a new message. Can be called from any thread.
    Messages are queued for asynchronous processing. Non-blocking.
    """
    # Check if the chatbot's background loop is running and ready to receive messages.
    if not self.is_running() or self._event_loop is None or self._event_loop.is_closed():
        logger.warning(f"Chatbot not running or loop not ready. Message lost: '{message}', metadata {metadata}")
        # A more advanced version could buffer messages internally here until start() is called.
        return

    try:
        # Use call_soon_threadsafe to safely schedule the put operation onto the
        # chatbot's internal event loop thread. This is necessary because
        # asyncio.Queue methods are not thread-safe for calls from other threads.
        # put_nowait is used because listen() should not block the caller thread.
        # If the queue is full, QueueFull will be raised *in the loop thread*,
        # which should ideally be handled there (e.g., by logger.a warning).
        self._event_loop.call_soon_threadsafe(
            self._incoming_queue.put_nowait, (message, metadata)
        )
        logger.debug(f"Message from {message}/{metadata} successfully queued.")

    except Exception as e:
         # Catch potential errors from call_soon_threadsafe itself (e.g., loop unexpectedly closed)
         logger.exception(f"Failed to enqueue message {message}/{metadata} via call_soon_threadsafe: {e}")
         # The message might be lost here depending on error handling design choice.

Notify the chatbot about a new message. Can be called from any thread. Messages are queued for asynchronous processing. Non-blocking.

async def responses(self, max_timeout: float = 5.0)
Expand source code
async def responses(self, max_timeout: float = 5.0):
    """
    An async generator that yields responses from the chatbot as they are ready.
    Consume this using 'async for'. Requires an asyncio event loop to be running
    in the context where this generator is iterated.

    The max_timeout parameter specifies the maximum accumulated time to wait for a response before yielding None.
    """
    # Check if the chatbot is running before starting consumption.
    # Allow consumption if not running but queues aren't empty, to drain remaining items.
    if not self.is_running() and self._outgoing_queue.empty():
        logger.warning("Chatbot not running and outgoing queue is empty, responses generator yielding nothing.")
        return # Generator immediately stops if not running and nothing to drain.

    logger.debug("Async responses generator started.")
    # Continue yielding as long as the chatbot is running OR there are items
    # left in the outgoing queue (to drain messages processed during shutdown).
    last_response_time = time.time()
    while self.is_running() or not self._outgoing_queue.empty():
         try:
            # Wait for a response with a short timeout.
            # The timeout allows the loop condition (self.is_running() and queue.empty()) to be checked
            # and prevents the generator from blocking indefinitely after stop is signalled
            # and the queue is empty.
            # If the internal loop is closing, get() might raise CancelledError, handled below.

            response = await asyncio.wait_for(self._outgoing_queue.get(), timeout=0.1) # Small timeout
            last_response_time = time.time() # Update last response time

            logger.debug(f"Async generator yielding response: {response}")
            yield response
            # Signal that the item has been consumed from the queue.
            # This must be done on the same loop the queue belongs to, which is the current loop here.
            self._outgoing_queue.task_done()

         except asyncio.TimeoutError:
             # This exception occurs when get(timeout) expires before an item is available.
             # The loop condition (while self.is_running() or not self._outgoing_queue.empty())
             # will be checked next.
             # logger.debug("Responses generator timed out waiting for response.")

             # if the time since the last response is greater than max_timeout, return None
             if time.time() - last_response_time > max_timeout:
                logger.debug("Responses generator timed out waiting for response.")
                # Return None to indicate no response was received within the timeout.
                # This allows the consumer to handle the timeout case.
                yield None
                break
         except asyncio.CancelledError:
             # This can happen if the internal loop is stopping/closing and cancels pending gets.
             logger.debug("Async responses generator cancelled during get.")
             # Treat as graceful shutdown, exit the generator.
             break
         except Exception as e:
            # Catch any other exceptions during queue retrieval or yielding.
            logger.exception(f"Error in async responses generator while getting/yielding: {e}")
            # Decide how to handle error - yield an error message or break the generator?
            # Yielding an error message allows the consumer to handle it explicitly.
            yield {"type": "generator_error", "content": f"Error retrieving response: {e}"}
            # If the error is critical or unrecoverable for the generator, uncomment break:
            # break # Exit the generator loop on error.

    logger.debug("Async responses generator finished.")

An async generator that yields responses from the chatbot as they are ready. Consume this using 'async for'. Requires an asyncio event loop to be running in the context where this generator is iterated.

The max_timeout parameter specifies the maximum accumulated time to wait for a response before yielding None.

async def start(self)
Expand source code
async def start(self):
    """
    Starts the internal chatbot processing thread and asyncio event loop.
    This method must be awaited and called from an asyncio context.
    """
    if self._running:
        logger.warning("Chatbot is already running.")
        return

    logger.debug("Starting Chatbot...")
    # Reset event and flag before starting the thread.
    self._stop_event.clear()
    self._running = True

    # Create and start the thread that will run the asyncio loop.
    # Use daemon=True so the thread doesn't prevent the main program from exiting
    # if the main thread finishes before the chatbot is explicitly stopped.
    self._loop_thread = threading.Thread(target=self._run_loop_in_thread, daemon=True)
    self._loop_thread.start()

    # --- Wait for Loop Initialization ---
    # Wait until the internal loop and processing task are initialized and ready.
    # Add a timeout for this wait itself to prevent hangs during startup.
    try:
        await asyncio.wait_for(self._wait_for_ready(), timeout=5.0) # Wait for internal signal
    except asyncio.TimeoutError:
         self._running = False
         logger.error("Chatbot failed to signal readiness within timeout.")
         # Attempt to join thread if it started but isn't ready
         if self._loop_thread and self._loop_thread.is_alive():
             logger.warning("Attempting to join thread after failed start.")
             # Need temporary async context again to join thread using asyncio.to_thread
             try:
                 asyncio.run(asyncio.wait_for(asyncio.to_thread(self._loop_thread.join, timeout=1), timeout=1))
                 logger.debug("Thread joined after failed start.")
             except Exception as join_e:
                 logger.exception(f"Error joining thread after failed start: {join_e}")

         self._loop_thread = None
         self._event_loop = None # Ensure state is clean
         self._processing_task = None
         raise ChatbotError("Failed to initialize internal processing within timeout.") from e

    # Final check after waiting
    # This check handles cases where _wait_for_ready completed but initialization failed immediately after.
    if self._event_loop is None or self._event_loop.is_closed() or \
       self._processing_task is None or self._processing_task.done():
         self._running = False # Mark as not running if initialization failed
         logger.error("Failed to initialize internal asyncio loop or processing task after wait.")
         # Attempt to join the thread to clean up resources if it potentially failed immediately.
         if self._loop_thread and self._loop_thread.is_alive():
             self._loop_thread.join(timeout=1) # Don't block the calling start indefinitely
         self._loop_thread = None
         self._event_loop = None # Ensure state is clean
         self._processing_task = None
         # Raise a specific error to indicate start failure.
         raise ChatbotError("Failed to initialize internal processing.")

    logger.debug("Chatbot started successfully.")

Starts the internal chatbot processing thread and asyncio event loop. This method must be awaited and called from an asyncio context.

async def stop(self)
Expand source code
async def stop(self):
    """
    Signals the internal chatbot processing to stop gracefully.
    This method must be awaited and called from an asyncio context.
    It waits for the internal processing thread to finish.
    """
    # Check if the chatbot is in a state that can be stopped.
    # If not running but thread is alive, still try to signal and join.
    if not self._running and (self._loop_thread is None or not self._loop_thread.is_alive()):
        logger.warning("Chatbot is not running and thread is not active.")
        # Clean up state just in case it's in an inconsistent state.
        self._running = False
        self._loop_thread = None
        self._event_loop = None
        self._processing_task = None
        return

    logger.debug("Stopping Chatbot...")
    # Set the running flag to False immediately.
    self._running = False

    # Signal the processing loop to stop by setting the asyncio event.
    # This must be called thread-safely onto the internal loop thread.
    if self._event_loop and not self._event_loop.is_closed():
         try:
            # Schedule the event setting on the loop thread.
            self._event_loop.call_soon_threadsafe(self._stop_event.set)
            logger.debug("Stop event signalled thread-safely.")
         except Exception as e:
             logger.exception(f"Error signalling stop event thread-safely: {e}")
             # Even if signalling fails, proceed with trying to join the thread.
    else:
         logger.warning("Event loop not available or closed during stop signal. Cannot signal stop event.")
         # If the loop is gone, the thread might be exiting already, but cleanup might be missed.


    # Wait for the thread running the loop to join.
    # Use asyncio.to_thread to run the blocking .join() call in a separate thread
    # managed by asyncio, so it doesn't block the async loop calling stop().
    logger.debug("Waiting for internal loop thread to join...")
    # Only attempt to join if the thread was actually started.
    if self._loop_thread is not None:
        try:
            # Set a timeout for joining the thread in case it's stuck (e.g., in blocking I/O).
            # This prevents the stop() method from hanging indefinitely.
            await asyncio.to_thread(self._loop_thread.join, timeout=10.0)

            # Check if the thread successfully joined or if the join timed out.
            if self._loop_thread and self._loop_thread.is_alive():
                 logger.error("Internal loop thread did not join within timeout!")
                 # Depending on requirements, you might log this and continue,
                 # or attempt more drastic measures (less recommended in libraries).
            else:
                 logger.debug("Internal loop thread joined successfully.")

        except Exception as e:
             # Catch any exceptions during the asyncio.to_thread or join operation.
             logger.exception(f"Error waiting for internal loop thread to join: {e}")

    # Ensure state is completely reset regardless of join success/failure.
    self._loop_thread = None
    self._event_loop = None # Should be None after close in thread (or potentially None if join timed out)
    self._processing_task = None # Should be None after thread exits or cancellation

    logger.debug("Chatbot stopped.")

Signals the internal chatbot processing to stop gracefully. This method must be awaited and called from an asyncio context. It waits for the internal processing thread to finish.

class LoggingQueue (maxsize=0)
Expand source code
class LoggingQueue(asyncio.Queue):
    """
    An asyncio.Queue subclass that logs put and get operations.
    """
    async def put(self, item):
        """Put an item onto the queue and log the operation."""
        logger.debug(f"QUEUE: Attempting to put item onto queue ({super().qsize()}): {item!r}")
        await super().put(item)
        logger.debug(f"QUEUE: Successfully put item onto queue ({super().qsize()}): {item!r}")

    async def get(self):
        """Retrieve an item from the queue and log the operation."""
        logger.debug(f"QUEUE: Attempting to get item from queue ({super().qsize()})...")
        item = await super().get()
        logger.debug(f"QUEUE: Successfully retrieved item from queue ({super().qsize()}): {item!r}")
        return item

An asyncio.Queue subclass that logs put and get operations.

Ancestors

  • asyncio.queues.Queue
  • asyncio.mixins._LoopBoundMixin

Methods

async def get(self)
Expand source code
async def get(self):
    """Retrieve an item from the queue and log the operation."""
    logger.debug(f"QUEUE: Attempting to get item from queue ({super().qsize()})...")
    item = await super().get()
    logger.debug(f"QUEUE: Successfully retrieved item from queue ({super().qsize()}): {item!r}")
    return item

Retrieve an item from the queue and log the operation.

async def put(self, item)
Expand source code
async def put(self, item):
    """Put an item onto the queue and log the operation."""
    logger.debug(f"QUEUE: Attempting to put item onto queue ({super().qsize()}): {item!r}")
    await super().put(item)
    logger.debug(f"QUEUE: Successfully put item onto queue ({super().qsize()}): {item!r}")

Put an item onto the queue and log the operation.

class SerialChatbot (llm,
config=None,
initial_message=None,
message_template=None,
max_messages: int = 9999999)
Expand source code
class SerialChatbot:
    def __init__(
            self,
            llm,
            config=None,
            initial_message=None,
            message_template=None,
            max_messages: int = 9999999,
    ):
        """
        Initialize the SerialChatbot with an LLM, configuration, and optional initial message and template.

        If the initial message is given, it will passed on as the first message it the whole chat context
        with the first message to reply to and all subsequent messages.

        If a message template is given, then whenever the reply method gets a string as the message,
        the template is used and the variable "${message}" is replaced with the string. Also all other variables
        found in the template are replaced with the corresponding values from the optional metadata dictionary
        passed to the reply method.

        The template is not used in any way for the initial message, the initial message is used as is.

        Args:
            llm: The LLM to use for generating responses.
            config: The full configuration object or None.
            initial_message: The LLM Message to send initially to the LLM, if None, send nothing.
            message_template: The prompt template to use if None, just send messages as role user.
            max_messages: Optional maximum number of messages to keep in the chat history. If None, no limit. If
                the number of messages exceeds this limit, the method compact_messages() is called to replace
                the messages with a compacted version.
        """
        self.llm = llm
        self.config = config
        self.llm_messages = []
        if initial_message:
            self.initial_message = any2message(initial_message)
            self.llm_messages.extend(initial_message)
        else:
            self.initial_message = None
        self.message_template = message_template
        self.max_messages = max_messages

    def reply(
            self,
            message: str|Dict[str,str]|List[Dict[str,str]],
            metadata: Optional[Dict[str,str]] = None):
        """
        Process a message and return a response.
        This method is blocking!

        Args:
            message: The message to process. Can be a string, a dictionary, or a list of dictionaries.
            metadata: Optional metadata dictionary for additional context.
        Returns:
            The response text generated by the LLM.
            If any error occurs, an exception is raised.

        """
        if isinstance(message,str) and self.message_template:
            vars = dict(message=message)
            if metadata:
                vars.update(metadata)
            message = any2message(self.message_template, vars=metadata)
        else:
            message = any2message(message, vars=metadata)
        # Add the message to the chat history
        self.llm_messages.extend(message)

        ret = self.llm.query(self.llm_messages, return_cost=True)
        if ret.get("error"):
            raise ChatbotError(f"Error in LLM query: {ret['error']}")
        else:
            answer = ret["answer"]
        self.llm_messages.append({"role": "assistant", "content": answer})
        return dict(answer=answer, error=None, is_ok=True, message=message, metadata=metadata, response=ret)

    def set_llm(self, llm: LLM):
        """
        Set the LLM to use for generating responses.
        """
        self.llm = llm

    def clear_history(self):
        """
        Clear the chat history. This removes all messages from the chat history and re-initiralizes with
        the initial message, if any.
        """
        if self.initial_message is not None:
            self.llm_messages = deepcopy(self.initial_message)
        else:
            self.llm_messages = []

    def append_messages(self, messages: list[dict]):
        """Append messages to the chat history and shorten the history if necessary"""
        self.llm_messages.extend(messages)
        # shorten the messages if necessary
        if len(self.llm_messages) > self.max_messages:
            self.compact_messages()

    def compact_messages(self):
        """
        Default strategy for compacting messages in the chat history. This will keep max_messages
        last messages if there was no initial message, or max_messages - 1 if there was an initial message.
        """
        cur_messages = self.llm_messages
        if self.initial_message is not None:
            self.llm_messages = [self.initial_message] + cur_messages[-(self.max_messages - 1):]
        else:
            self.llm_messages = cur_messages[-self.max_messages:]

Initialize the SerialChatbot with an LLM, configuration, and optional initial message and template.

If the initial message is given, it will passed on as the first message it the whole chat context with the first message to reply to and all subsequent messages.

If a message template is given, then whenever the reply method gets a string as the message, the template is used and the variable "${message}" is replaced with the string. Also all other variables found in the template are replaced with the corresponding values from the optional metadata dictionary passed to the reply method.

The template is not used in any way for the initial message, the initial message is used as is.

Args

llm
The LLM to use for generating responses.
config
The full configuration object or None.
initial_message
The LLM Message to send initially to the LLM, if None, send nothing.
message_template
The prompt template to use if None, just send messages as role user.
max_messages
Optional maximum number of messages to keep in the chat history. If None, no limit. If the number of messages exceeds this limit, the method compact_messages() is called to replace the messages with a compacted version.

Subclasses

Methods

def append_messages(self, messages: list[dict])
Expand source code
def append_messages(self, messages: list[dict]):
    """Append messages to the chat history and shorten the history if necessary"""
    self.llm_messages.extend(messages)
    # shorten the messages if necessary
    if len(self.llm_messages) > self.max_messages:
        self.compact_messages()

Append messages to the chat history and shorten the history if necessary

def clear_history(self)
Expand source code
def clear_history(self):
    """
    Clear the chat history. This removes all messages from the chat history and re-initiralizes with
    the initial message, if any.
    """
    if self.initial_message is not None:
        self.llm_messages = deepcopy(self.initial_message)
    else:
        self.llm_messages = []

Clear the chat history. This removes all messages from the chat history and re-initiralizes with the initial message, if any.

def compact_messages(self)
Expand source code
def compact_messages(self):
    """
    Default strategy for compacting messages in the chat history. This will keep max_messages
    last messages if there was no initial message, or max_messages - 1 if there was an initial message.
    """
    cur_messages = self.llm_messages
    if self.initial_message is not None:
        self.llm_messages = [self.initial_message] + cur_messages[-(self.max_messages - 1):]
    else:
        self.llm_messages = cur_messages[-self.max_messages:]

Default strategy for compacting messages in the chat history. This will keep max_messages last messages if there was no initial message, or max_messages - 1 if there was an initial message.

def reply(self,
message: str | Dict[str, str] | List[Dict[str, str]],
metadata: Dict[str, str] | None = None)
Expand source code
def reply(
        self,
        message: str|Dict[str,str]|List[Dict[str,str]],
        metadata: Optional[Dict[str,str]] = None):
    """
    Process a message and return a response.
    This method is blocking!

    Args:
        message: The message to process. Can be a string, a dictionary, or a list of dictionaries.
        metadata: Optional metadata dictionary for additional context.
    Returns:
        The response text generated by the LLM.
        If any error occurs, an exception is raised.

    """
    if isinstance(message,str) and self.message_template:
        vars = dict(message=message)
        if metadata:
            vars.update(metadata)
        message = any2message(self.message_template, vars=metadata)
    else:
        message = any2message(message, vars=metadata)
    # Add the message to the chat history
    self.llm_messages.extend(message)

    ret = self.llm.query(self.llm_messages, return_cost=True)
    if ret.get("error"):
        raise ChatbotError(f"Error in LLM query: {ret['error']}")
    else:
        answer = ret["answer"]
    self.llm_messages.append({"role": "assistant", "content": answer})
    return dict(answer=answer, error=None, is_ok=True, message=message, metadata=metadata, response=ret)

Process a message and return a response. This method is blocking!

Args

message
The message to process. Can be a string, a dictionary, or a list of dictionaries.
metadata
Optional metadata dictionary for additional context.

Returns

The response text generated by the LLM. If any error occurs, an exception is raised.

def set_llm(self,
llm: LLM)
Expand source code
def set_llm(self, llm: LLM):
    """
    Set the LLM to use for generating responses.
    """
    self.llm = llm

Set the LLM to use for generating responses.

class SimpleSerialChatbot (*args, **kwargs)
Expand source code
class SimpleSerialChatbot(SerialChatbot):
    """
    This implementation limits the reply function to just string messages.
    """
    def __init__(
            self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # list of tuples containing user requests and responses
        self.history: list = []

    def clear_history(self):
        """Clear the chat history"""
        super().clear_history()
        self.history = []

    def reply(self, request: str) -> dict:
        new_messages = any2message(request)
        logger.debug(f"SimpleSerialChatbot: reply called with request: {request}, new_messages: {new_messages}")
        self.append_messages(new_messages)
        logger.debug(f"SimpleSerialChatbot: reply: llm_messages before query: {self.llm_messages}")
        response = self.llm.query(self.llm_messages, return_cost=True)
        logger.debug(f"SimpleSerialChatbot: reply: response is {response}")
        if response.get("error"):
            error = response["error"]
            self.history.append((request, error))
            self.append_messages([dict(role="assistant", content=f"Error: {error}")])
            return dict(
                error=response["error"],
                answer=None, is_ok=False,
                cost=response.get("cost", 0),
                n_prompt_tokens=response.get("n_prompt_tokens", 0),
                n_completion_tokens=response.get("n_completion_tokens", 0),
                response=response)
        else:
            self.append_messages([dict(role="assistant", content=response["answer"])])
            return dict(
                error=None,
                answer=response["answer"],
                cost=response.get("cost", 0),
                n_prompt_tokens=response.get("n_prompt_tokens", 0),
                n_completion_tokens=response.get("n_completion_tokens", 0),
                is_ok=True,
                response=response,
            )

This implementation limits the reply function to just string messages.

Initialize the SerialChatbot with an LLM, configuration, and optional initial message and template.

If the initial message is given, it will passed on as the first message it the whole chat context with the first message to reply to and all subsequent messages.

If a message template is given, then whenever the reply method gets a string as the message, the template is used and the variable "${message}" is replaced with the string. Also all other variables found in the template are replaced with the corresponding values from the optional metadata dictionary passed to the reply method.

The template is not used in any way for the initial message, the initial message is used as is.

Args

llm
The LLM to use for generating responses.
config
The full configuration object or None.
initial_message
The LLM Message to send initially to the LLM, if None, send nothing.
message_template
The prompt template to use if None, just send messages as role user.
max_messages
Optional maximum number of messages to keep in the chat history. If None, no limit. If the number of messages exceeds this limit, the method compact_messages() is called to replace the messages with a compacted version.

Ancestors

Methods

def clear_history(self)
Expand source code
def clear_history(self):
    """Clear the chat history"""
    super().clear_history()
    self.history = []

Clear the chat history

Inherited members

class TestSerialChatbot (llm,
config=None,
initial_message=None,
message_template=None,
max_messages: int = 9999999)
Expand source code
class TestSerialChatbot(SerialChatbot):

    def reply(self, message: str, metadata: Optional[Dict[str,str]] = None) -> dict:
        """
        Simulate a simple chatbot reply. This is a blocking call.
        """
        if metadata is None:
            metadata = dict(
                waittime = random.uniform(0.5, 2.0),
                msg_id = "??",
                simulate_error = False, simulate_no_answer = False)
        msg_id = metadata.get("msg_id", "??")
        wait_time = metadata.get("wait_time", 0.5)
        simulate_error = metadata.get("simulate_error", False)
        simulate_no_answer = metadata.get("simulate_no_answer", False)
        time.sleep(wait_time)
        if simulate_error:
            raise Exception(f"{msg_id} >TestSerialChatbot: Simulated processing error.")
        if simulate_no_answer:
            answer = None
        else:
            answer = f"TestSerialChatbot: returning response to '{message}'"
        return dict(answer=answer, metadata=metadata, message=message, is_ok=True, error=None)

Initialize the SerialChatbot with an LLM, configuration, and optional initial message and template.

If the initial message is given, it will passed on as the first message it the whole chat context with the first message to reply to and all subsequent messages.

If a message template is given, then whenever the reply method gets a string as the message, the template is used and the variable "${message}" is replaced with the string. Also all other variables found in the template are replaced with the corresponding values from the optional metadata dictionary passed to the reply method.

The template is not used in any way for the initial message, the initial message is used as is.

Args

llm
The LLM to use for generating responses.
config
The full configuration object or None.
initial_message
The LLM Message to send initially to the LLM, if None, send nothing.
message_template
The prompt template to use if None, just send messages as role user.
max_messages
Optional maximum number of messages to keep in the chat history. If None, no limit. If the number of messages exceeds this limit, the method compact_messages() is called to replace the messages with a compacted version.

Ancestors

Methods

def reply(self, message: str, metadata: Dict[str, str] | None = None) ‑> dict
Expand source code
def reply(self, message: str, metadata: Optional[Dict[str,str]] = None) -> dict:
    """
    Simulate a simple chatbot reply. This is a blocking call.
    """
    if metadata is None:
        metadata = dict(
            waittime = random.uniform(0.5, 2.0),
            msg_id = "??",
            simulate_error = False, simulate_no_answer = False)
    msg_id = metadata.get("msg_id", "??")
    wait_time = metadata.get("wait_time", 0.5)
    simulate_error = metadata.get("simulate_error", False)
    simulate_no_answer = metadata.get("simulate_no_answer", False)
    time.sleep(wait_time)
    if simulate_error:
        raise Exception(f"{msg_id} >TestSerialChatbot: Simulated processing error.")
    if simulate_no_answer:
        answer = None
    else:
        answer = f"TestSerialChatbot: returning response to '{message}'"
    return dict(answer=answer, metadata=metadata, message=message, is_ok=True, error=None)

Simulate a simple chatbot reply. This is a blocking call.

Inherited members