using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using MQTTnet; using MQTTnet.Client; using UnityEngine; using UnityEngine.Events; namespace UltraCombos { public class MqttBridge : MonoBehaviour { [SerializeField] private string brokerHost = "localhost"; [SerializeField] private int brokerPort = 1883; [SerializeField] private string topic = "unity/#"; [SerializeField] private bool verbose = true; [System.Serializable] public class MessageEvent : UnityEvent { } public MessageEvent onMessage; public bool IsConnected => mqttClient?.IsConnected ?? false; private IMqttClient mqttClient; private CancellationTokenSource cts; private string clientId; // Messages received on MQTT thread → dispatched on main thread in Update() private readonly ConcurrentQueue<(string topic, string payload)> messageQueue = new ConcurrentQueue<(string, string)>(); private void Awake() { // Cache Unity main-thread-only API before any async/thread-pool usage clientId = $"unity_{SystemInfo.deviceUniqueIdentifier}"; } private async void Start() { cts = new CancellationTokenSource(); // ConfigureAwait(false): continuation doesn't need to resume on Unity main thread await ConnectAsync().ConfigureAwait(false); } private void Update() { // Drain the queue on the main thread so handlers can safely touch Unity objects while (messageQueue.TryDequeue(out var msg)) { onMessage?.Invoke(msg.topic, msg.payload); HandleMessage(msg.topic, msg.payload); } } private void OnDestroy() { // Must be synchronous: at shutdown the Unity SynchronizationContext stops processing, // so async void OnDestroy would hang forever waiting for a continuation that never runs. cts?.Cancel(); if (mqttClient?.IsConnected == true) { try { mqttClient.DisconnectAsync().Wait(System.TimeSpan.FromSeconds(2)); } catch { } } mqttClient?.Dispose(); } // ── Connection ──────────────────────────────────────────────────────── private async Task ConnectAsync() { if (mqttClient != null) { mqttClient.ApplicationMessageReceivedAsync -= OnApplicationMessageReceived; mqttClient.ConnectedAsync -= OnConnected; mqttClient.DisconnectedAsync -= OnDisconnected; mqttClient.Dispose(); mqttClient = null; } var factory = new MqttFactory(); mqttClient = factory.CreateMqttClient(); var options = new MqttClientOptionsBuilder() .WithTcpServer(brokerHost, brokerPort) .WithClientId(clientId) .WithCleanSession() .Build(); mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceived; mqttClient.ConnectedAsync += OnConnected; mqttClient.DisconnectedAsync += OnDisconnected; try { using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); connectCts.CancelAfter(System.TimeSpan.FromSeconds(5)); await mqttClient.ConnectAsync(options, connectCts.Token).ConfigureAwait(false); } catch (System.Exception e) { Debug.LogError($"[MQTT] Connect failed: {e.Message}"); } } private async Task OnConnected(MqttClientConnectedEventArgs e) { Debug.Log("[MQTT] Connected."); await SubscribeAsync(topic).ConfigureAwait(false); } private async Task OnDisconnected(MqttClientDisconnectedEventArgs e) { Debug.LogWarning($"[MQTT] Disconnected. Reconnecting in 3s…"); if (cts.IsCancellationRequested) return; try { await Task.Delay(System.TimeSpan.FromSeconds(3), cts.Token) .ConfigureAwait(false); await ConnectAsync().ConfigureAwait(false); } catch (TaskCanceledException) { } } // ── Subscribe / Publish ─────────────────────────────────────────────── private async Task SubscribeAsync(string topicFilter) { var filter = new MqttTopicFilterBuilder() .WithTopic(topicFilter) .Build(); await mqttClient.SubscribeAsync(filter, cts.Token).ConfigureAwait(false); Debug.Log($"[MQTT] Subscribed to: {topicFilter}"); } public async Task PublishAsync(string publishTopic, string payload) { if (!IsConnected) { Debug.LogWarning("[MQTT] Publish skipped — not connected."); return; } var message = new MqttApplicationMessageBuilder() .WithTopic(publishTopic) .WithPayload(payload) .Build(); await mqttClient.PublishAsync(message, cts.Token).ConfigureAwait(false); } // ── Receive ─────────────────────────────────────────────────────────── private Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { var incomingTopic = e.ApplicationMessage.Topic; var payload = e.ApplicationMessage.ConvertPayloadToString(); // Enqueue so it's processed on the main thread in Update() messageQueue.Enqueue((incomingTopic, payload)); return Task.CompletedTask; } // Override this in a subclass, or subscribe to OnMessage instead protected virtual void HandleMessage(string incomingTopic, string payload) { if (verbose) { Debug.Log($"[MQTT] {incomingTopic}: {payload}"); } } } }