You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
173 lines
6.5 KiB
173 lines
6.5 KiB
using System.Collections.Concurrent;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using MQTTnet;
|
|
using MQTTnet.Client;
|
|
using UnityEngine;
|
|
using UnityEngine.Events;
|
|
|
|
namespace UltraCombos
|
|
{
|
|
public class MqttBridge : MonoBehaviour
|
|
{
|
|
[SerializeField] private string brokerHost = "localhost";
|
|
[SerializeField] private int brokerPort = 1883;
|
|
[SerializeField] private string topic = "unity/#";
|
|
[SerializeField] private bool verbose = true;
|
|
|
|
[System.Serializable]
|
|
public class MessageEvent : UnityEvent<string, string> { }
|
|
public MessageEvent onMessage;
|
|
public bool IsConnected => mqttClient?.IsConnected ?? false;
|
|
|
|
private IMqttClient mqttClient;
|
|
private CancellationTokenSource cts;
|
|
private string clientId;
|
|
|
|
// Messages received on MQTT thread → dispatched on main thread in Update()
|
|
private readonly ConcurrentQueue<(string topic, string payload)> messageQueue
|
|
= new ConcurrentQueue<(string, string)>();
|
|
|
|
private void Awake()
|
|
{
|
|
// Cache Unity main-thread-only API before any async/thread-pool usage
|
|
clientId = $"unity_{SystemInfo.deviceUniqueIdentifier}";
|
|
}
|
|
|
|
private async void Start()
|
|
{
|
|
cts = new CancellationTokenSource();
|
|
// ConfigureAwait(false): continuation doesn't need to resume on Unity main thread
|
|
await ConnectAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
private void Update()
|
|
{
|
|
// Drain the queue on the main thread so handlers can safely touch Unity objects
|
|
while (messageQueue.TryDequeue(out var msg))
|
|
{
|
|
onMessage?.Invoke(msg.topic, msg.payload);
|
|
HandleMessage(msg.topic, msg.payload);
|
|
}
|
|
}
|
|
|
|
private void OnDestroy()
|
|
{
|
|
// Must be synchronous: at shutdown the Unity SynchronizationContext stops processing,
|
|
// so async void OnDestroy would hang forever waiting for a continuation that never runs.
|
|
cts?.Cancel();
|
|
if (mqttClient?.IsConnected == true)
|
|
{
|
|
try { mqttClient.DisconnectAsync().Wait(System.TimeSpan.FromSeconds(2)); }
|
|
catch { }
|
|
}
|
|
mqttClient?.Dispose();
|
|
}
|
|
|
|
// ── Connection ────────────────────────────────────────────────────────
|
|
|
|
private async Task ConnectAsync()
|
|
{
|
|
if (mqttClient != null)
|
|
{
|
|
mqttClient.ApplicationMessageReceivedAsync -= OnApplicationMessageReceived;
|
|
mqttClient.ConnectedAsync -= OnConnected;
|
|
mqttClient.DisconnectedAsync -= OnDisconnected;
|
|
mqttClient.Dispose();
|
|
mqttClient = null;
|
|
}
|
|
|
|
var factory = new MqttFactory();
|
|
mqttClient = factory.CreateMqttClient();
|
|
|
|
var options = new MqttClientOptionsBuilder()
|
|
.WithTcpServer(brokerHost, brokerPort)
|
|
.WithClientId(clientId)
|
|
.WithCleanSession()
|
|
.Build();
|
|
|
|
mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceived;
|
|
mqttClient.ConnectedAsync += OnConnected;
|
|
mqttClient.DisconnectedAsync += OnDisconnected;
|
|
|
|
try
|
|
{
|
|
using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
|
|
connectCts.CancelAfter(System.TimeSpan.FromSeconds(5));
|
|
await mqttClient.ConnectAsync(options, connectCts.Token).ConfigureAwait(false);
|
|
}
|
|
catch (System.Exception e)
|
|
{
|
|
Debug.LogError($"<b>[MQTT]</b> Connect failed: {e.Message}");
|
|
}
|
|
}
|
|
|
|
private async Task OnConnected(MqttClientConnectedEventArgs e)
|
|
{
|
|
Debug.Log("<b>[MQTT]</b> Connected.");
|
|
await SubscribeAsync(topic).ConfigureAwait(false);
|
|
}
|
|
|
|
private async Task OnDisconnected(MqttClientDisconnectedEventArgs e)
|
|
{
|
|
Debug.LogWarning($"[MQTT] Disconnected. Reconnecting in 3s…");
|
|
if (cts.IsCancellationRequested) return;
|
|
|
|
try
|
|
{
|
|
await Task.Delay(System.TimeSpan.FromSeconds(3), cts.Token)
|
|
.ConfigureAwait(false);
|
|
await ConnectAsync().ConfigureAwait(false);
|
|
}
|
|
catch (TaskCanceledException) { }
|
|
}
|
|
|
|
// ── Subscribe / Publish ───────────────────────────────────────────────
|
|
|
|
private async Task SubscribeAsync(string topicFilter)
|
|
{
|
|
var filter = new MqttTopicFilterBuilder()
|
|
.WithTopic(topicFilter)
|
|
.Build();
|
|
|
|
await mqttClient.SubscribeAsync(filter, cts.Token).ConfigureAwait(false);
|
|
Debug.Log($"<b>[MQTT]</b> Subscribed to: {topicFilter}");
|
|
}
|
|
|
|
public async Task PublishAsync(string publishTopic, string payload)
|
|
{
|
|
if (!IsConnected)
|
|
{
|
|
Debug.LogWarning("<b>[MQTT]</b> Publish skipped — not connected.");
|
|
return;
|
|
}
|
|
|
|
var message = new MqttApplicationMessageBuilder()
|
|
.WithTopic(publishTopic)
|
|
.WithPayload(payload)
|
|
.Build();
|
|
|
|
await mqttClient.PublishAsync(message, cts.Token).ConfigureAwait(false);
|
|
}
|
|
|
|
// ── Receive ───────────────────────────────────────────────────────────
|
|
|
|
private Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
|
|
{
|
|
var incomingTopic = e.ApplicationMessage.Topic;
|
|
var payload = e.ApplicationMessage.ConvertPayloadToString();
|
|
// Enqueue so it's processed on the main thread in Update()
|
|
messageQueue.Enqueue((incomingTopic, payload));
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
// Override this in a subclass, or subscribe to OnMessage instead
|
|
protected virtual void HandleMessage(string incomingTopic, string payload)
|
|
{
|
|
if (verbose)
|
|
{
|
|
Debug.Log($"<b>[MQTT]</b> {incomingTopic}: {payload}");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|