You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

173 lines
6.5 KiB

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using UnityEngine;
using UnityEngine.Events;
namespace UltraCombos
{
public class MqttBridge : MonoBehaviour
{
[SerializeField] private string brokerHost = "localhost";
[SerializeField] private int brokerPort = 1883;
[SerializeField] private string topic = "unity/#";
[SerializeField] private bool verbose = true;
[System.Serializable]
public class MessageEvent : UnityEvent<string, string> { }
public MessageEvent onMessage;
public bool IsConnected => mqttClient?.IsConnected ?? false;
private IMqttClient mqttClient;
private CancellationTokenSource cts;
private string clientId;
// Messages received on MQTT thread → dispatched on main thread in Update()
private readonly ConcurrentQueue<(string topic, string payload)> messageQueue
= new ConcurrentQueue<(string, string)>();
private void Awake()
{
// Cache Unity main-thread-only API before any async/thread-pool usage
clientId = $"unity_{SystemInfo.deviceUniqueIdentifier}";
}
private async void Start()
{
cts = new CancellationTokenSource();
// ConfigureAwait(false): continuation doesn't need to resume on Unity main thread
await ConnectAsync().ConfigureAwait(false);
}
private void Update()
{
// Drain the queue on the main thread so handlers can safely touch Unity objects
while (messageQueue.TryDequeue(out var msg))
{
onMessage?.Invoke(msg.topic, msg.payload);
HandleMessage(msg.topic, msg.payload);
}
}
private void OnDestroy()
{
// Must be synchronous: at shutdown the Unity SynchronizationContext stops processing,
// so async void OnDestroy would hang forever waiting for a continuation that never runs.
cts?.Cancel();
if (mqttClient?.IsConnected == true)
{
try { mqttClient.DisconnectAsync().Wait(System.TimeSpan.FromSeconds(2)); }
catch { }
}
mqttClient?.Dispose();
}
// ── Connection ────────────────────────────────────────────────────────
private async Task ConnectAsync()
{
if (mqttClient != null)
{
mqttClient.ApplicationMessageReceivedAsync -= OnApplicationMessageReceived;
mqttClient.ConnectedAsync -= OnConnected;
mqttClient.DisconnectedAsync -= OnDisconnected;
mqttClient.Dispose();
mqttClient = null;
}
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer(brokerHost, brokerPort)
.WithClientId(clientId)
.WithCleanSession()
.Build();
mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceived;
mqttClient.ConnectedAsync += OnConnected;
mqttClient.DisconnectedAsync += OnDisconnected;
try
{
using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
connectCts.CancelAfter(System.TimeSpan.FromSeconds(5));
await mqttClient.ConnectAsync(options, connectCts.Token).ConfigureAwait(false);
}
catch (System.Exception e)
{
Debug.LogError($"<b>[MQTT]</b> Connect failed: {e.Message}");
}
}
private async Task OnConnected(MqttClientConnectedEventArgs e)
{
Debug.Log("<b>[MQTT]</b> Connected.");
await SubscribeAsync(topic).ConfigureAwait(false);
}
private async Task OnDisconnected(MqttClientDisconnectedEventArgs e)
{
Debug.LogWarning($"[MQTT] Disconnected. Reconnecting in 3s…");
if (cts.IsCancellationRequested) return;
try
{
await Task.Delay(System.TimeSpan.FromSeconds(3), cts.Token)
.ConfigureAwait(false);
await ConnectAsync().ConfigureAwait(false);
}
catch (TaskCanceledException) { }
}
// ── Subscribe / Publish ───────────────────────────────────────────────
private async Task SubscribeAsync(string topicFilter)
{
var filter = new MqttTopicFilterBuilder()
.WithTopic(topicFilter)
.Build();
await mqttClient.SubscribeAsync(filter, cts.Token).ConfigureAwait(false);
Debug.Log($"<b>[MQTT]</b> Subscribed to: {topicFilter}");
}
public async Task PublishAsync(string publishTopic, string payload)
{
if (!IsConnected)
{
Debug.LogWarning("<b>[MQTT]</b> Publish skipped — not connected.");
return;
}
var message = new MqttApplicationMessageBuilder()
.WithTopic(publishTopic)
.WithPayload(payload)
.Build();
await mqttClient.PublishAsync(message, cts.Token).ConfigureAwait(false);
}
// ── Receive ───────────────────────────────────────────────────────────
private Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
var incomingTopic = e.ApplicationMessage.Topic;
var payload = e.ApplicationMessage.ConvertPayloadToString();
// Enqueue so it's processed on the main thread in Update()
messageQueue.Enqueue((incomingTopic, payload));
return Task.CompletedTask;
}
// Override this in a subclass, or subscribe to OnMessage instead
protected virtual void HandleMessage(string incomingTopic, string payload)
{
if (verbose)
{
Debug.Log($"<b>[MQTT]</b> {incomingTopic}: {payload}");
}
}
}
}