Real Time Container

The real time container feature in Mango.jl allows you to create and manage a container, which acts as the communication layer within the environment. A container is responsible for handling messages, forwarding them to the appropriate agents, and managing agent registration. The real time component means that the container acts on a real time clock, and does not differentiate between a simulation time and the execution time, which essentially means everything executed withing the real time container is executed immediately as stated in the code. In contrast, there is also a "simulation" container, which maintains an interal simulation time and only executes tasks and delivers messages according to the requested step_sizes (next event time). More on the simulation container can be found under Simulation Container. Note, that both container types implement the methods for the ContainerInterface and can therefore be drop-in replacements for the each other with slight differences in usage.

Container Struct

The Container struct represents the container as an actor within the environment. It is implemented using composition, making it flexible to use different protocols and codecs for message communication. The key components of the Container struct are:

  • protocol: The protocol used for message communication (e.g., TCP).
  • codec: A pair of functions for encoding and decoding messages in the container.

Start and Shutdown

Before using the container for message handling and agent management, you need to start the container using the start function. This function initializes the container's components and enables it to act as the communication layer. After you are done with the container, shutdown has to be called.

using Mango

# Create a container instance
container = Container()

# ... setup the container, agents, define handles, ...

# Start the container
wait(Threads.@spawn start(container))

# Execute some functionality to e.g. trigger the agent system

# Shut down the container
shutdown(container)
@info "Agent container and agents shutdown"
[ Info: Agent container and agents shutdown

However, this approach can be error-prone for multiple reasons. Besides simply forgetting to call shutdown, an exception may occur between the start and shutdown calls on the containers, leading to resource leaks. For this reason, we recommend using activate instead. With this function, the above `start/shutdown' pair translates to...

# Start the container and shut it down after the runnable (do ... end) has been executed.
activate(container) do

# Execute some functionality to e.g. trigger the agent system

end

Registering Agents

To enable the container to manage agents and handle their messaging activities, you can register agents using the register function. This function associates an agent with a unique agent ID (AID) and adds the agent to the container's internal list.

using Mango

# Create a container instance
container = Container()

# Define and create an agent
@agent struct MyAgent
    # Your agent's fields and methods here
end

my_agent = MyAgent()

# Register the agent with the container
register(container, my_agent)
Main.MyAgent(ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (8, 140718396928336, 140718396928464)), AgentContext(Container(OrderedCollections.OrderedDict{String, Agent}("agent0" => Main.MyAgent(#= circular reference @-4 =#)), 1, nothing, (Mango.encode, Mango.decode), false, nothing, nothing)), AgentRoleHandler(Role[], Tuple{Role, Function, Function}[], Tuple{Role, Function}[], Dict{Any, Vector{Tuple{Role, Function, Function}}}(), Dict{DataType, Any}()), Scheduler(ConcurrentCollections.Implementations.LinearProbingDict{Task, TaskData, Union{ConcurrentCollections.Implementations.Empty, Task}, Union{ConcurrentCollections.Implementations.Empty, ConcurrentCollections.Implementations.Moved{ConcurrentCollections.Implementations.Empty}, ConcurrentCollections.Implementations.Moved{TaskData}, TaskData}}(), Mango.DateTimeClock()), "agent0", Dict{String, Tuple}(), ForwardingRule[], Dict{DataType, Any}())

Sending Messages

To send messages between agents within the container, you can use the send_message function. The container routes the message to the specified receiver agent based on the receiver's AID.

using Mango

# Create a container instance
container = Container()
agent = register(container, PrintingAgent())

# ... Register agents ...

# Sending a message from one agent to another
wait(send_message(container, "Hello from Agent 1!", address(agent)))
┌ Info: Got
└   (message, meta) = ("Hello from Agent 1!", OrderedCollections.OrderedDict{String, Any}("receiver_id" => "agent0", "sender_id" => nothing, "tracking_id" => nothing, "sender_addr" => nothing))

TCP

This protocol allows communication over plain TCP connections, enabling message exchange between different entities within the Mango.jl simulation environment.

Introduction

The TCP Protocol in Mango.jl is a communication protocol used to exchange messages over plain TCP connections. It enables agents within the simulation environment to communicate with each other by establishing and managing TCP connections.

TCPProtocol Struct

The TCPProtocol struct represents the TCP Protocol within Mango.jl. It encapsulates the necessary functionalities for communication via TCP connections. Key features of the TCPProtocol struct are:

  • address: The InetAddr represents the address on which the TCP server listens.
  • server: A TCPServer instance used for accepting incoming connections.

Usage

To use the tcp protocol you need to construct a TCPProtocol struct and assign it to the protocol field in the container.

using Mango, Sockets

container2 = Container()
container2.protocol = TCPProtocol(address=Sockets.InetAddr("127.0.0.2", 2940))
TCPProtocol(Sockets.InetAddr{Sockets.IPv4}(ip"127.0.0.2", 2940), nothing, Mango.TCPConnectionPool(100000, ConcurrentUtilities.Pools.Pool{Sockets.InetAddr, Tuple{Sockets.TCPSocket, Dates.DateTime}}(Base.GenericCondition{ReentrantLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (1, 140717696774768, 14))), 100, 0, Dict{Sockets.InetAddr, Vector{Tuple{Sockets.TCPSocket, Dates.DateTime}}}(), #undef), ConcurrentUtilities.ReadWriteLock(ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (8, 0, 1735166787969)), Base.GenericCondition{ReentrantLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 140717733371136, 1735166787969))), Base.Event(Base.GenericCondition{ReentrantLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (8, 140717696317184, 1735166787969))), true, false), 0, 0), false, Mango.AtomicCounter(0)))

It is also possible to use the convenience function create_tcp_container.

using Mango

container2 = create_tcp_container("127.0.0.2", 2940)
Container(OrderedCollections.OrderedDict{String, Agent}(), 0, TCPProtocol(Sockets.InetAddr{Sockets.IPv4}(ip"127.0.0.2", 2940), nothing, Mango.TCPConnectionPool(100000, ConcurrentUtilities.Pools.Pool{Sockets.InetAddr, Tuple{Sockets.TCPSocket, Dates.DateTime}}(Base.GenericCondition{ReentrantLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (0, 140717713131024, 0))), 100, 0, Dict{Sockets.InetAddr, Vector{Tuple{Sockets.TCPSocket, Dates.DateTime}}}(), #undef), ConcurrentUtilities.ReadWriteLock(ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (1, 140717697896064, 0)), Base.GenericCondition{ReentrantLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (1, 140718315091840, 140718229992976))), Base.Event(Base.GenericCondition{ReentrantLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.IntrusiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (1, 140718237898768, 0))), true, false), 0, 0), false, Mango.AtomicCounter(0))), (Mango.encode, Mango.decode), false, nothing, nothing)

MQTT

Introduction

The MQTT protocol enables sending via an MQTT message broker. It allows a container to subscribe to different topics on a broker and publish messages to them.

Currently, one container may only connect to a single broker. Subscribed topics for each agent are set on agent registration and tracked by the container. Incoming messages on these topics are distributed to the subscribing agents by the container.

MQTTProtocol Struct

The MQTTProtocol contains the status and channels of the underlying mosquitto C library (as abstracted to Julia by the Mosquitto.jl package).

The constructor takes a client_id and the broker_addr. Internally it also tracks the msg_channel and conn_channel, internal flags, the information to map topics to subscribing agents.

protocol = MQTTProtocol(cliant_id, broker_addr)

  • client_id - String id the container will communicate to the MQTT broker.
  • broker_addr - InetAddr of the MQTT broker

Usage

To use the mqtt protocol you need to construct a MQTTProtocol struct and assign it to the protocol field in the container. Further it is possible to use a convenience function for this It is also possible to use the convenience function create_mqtt_container.

using Mango, Sockets

container2 = Container()
container2.protocol = MQTTProtocol("my_id", Sockets.InetAddr(ip"127.0.0.2", 2940))
Running MQTT broker expected

The MQTT protocol expects an MQTT broker to run at the specified host and port.

Subscribing an agent to a topic can happen only as registration time and is not allowed otherwise. When registering a new agent to the container the topics to subscribe are passed by the topics keyword argument, taking a collection of String topic names. NOTE: It is recommended you pass a Vector{String} as this is what is tested. Other collections could work but no guarantees are given.

using Mango

container2 = create_mqtt_container("127.0.0.2", 2940, "MyMqttClient")

a1 = PrintingAgent()
register(container2, a1; topics=["topic1", "topic2"])