parent
f2578aa19d
commit
8aa58b1d6e
3 changed files with 332 additions and 73 deletions
@ -0,0 +1,173 @@ |
||||
using System.Collections.Concurrent; |
||||
using System.Threading; |
||||
using System.Threading.Tasks; |
||||
using MQTTnet; |
||||
using MQTTnet.Client; |
||||
using UnityEngine; |
||||
using UnityEngine.Events; |
||||
|
||||
namespace UltraCombos |
||||
{ |
||||
public class MqttBridge : MonoBehaviour |
||||
{ |
||||
[SerializeField] private string brokerHost = "localhost"; |
||||
[SerializeField] private int brokerPort = 1883; |
||||
[SerializeField] private string topic = "unity/#"; |
||||
[SerializeField] private bool verbose = true; |
||||
|
||||
[System.Serializable] |
||||
public class MessageEvent : UnityEvent<string, string> { } |
||||
public MessageEvent onMessage; |
||||
public bool IsConnected => mqttClient?.IsConnected ?? false; |
||||
|
||||
private IMqttClient mqttClient; |
||||
private CancellationTokenSource cts; |
||||
private string clientId; |
||||
|
||||
// Messages received on MQTT thread → dispatched on main thread in Update() |
||||
private readonly ConcurrentQueue<(string topic, string payload)> messageQueue |
||||
= new ConcurrentQueue<(string, string)>(); |
||||
|
||||
private void Awake() |
||||
{ |
||||
// Cache Unity main-thread-only API before any async/thread-pool usage |
||||
clientId = $"unity_{SystemInfo.deviceUniqueIdentifier}"; |
||||
} |
||||
|
||||
private async void Start() |
||||
{ |
||||
cts = new CancellationTokenSource(); |
||||
// ConfigureAwait(false): continuation doesn't need to resume on Unity main thread |
||||
await ConnectAsync().ConfigureAwait(false); |
||||
} |
||||
|
||||
private void Update() |
||||
{ |
||||
// Drain the queue on the main thread so handlers can safely touch Unity objects |
||||
while (messageQueue.TryDequeue(out var msg)) |
||||
{ |
||||
onMessage?.Invoke(msg.topic, msg.payload); |
||||
HandleMessage(msg.topic, msg.payload); |
||||
} |
||||
} |
||||
|
||||
private void OnDestroy() |
||||
{ |
||||
// Must be synchronous: at shutdown the Unity SynchronizationContext stops processing, |
||||
// so async void OnDestroy would hang forever waiting for a continuation that never runs. |
||||
cts?.Cancel(); |
||||
if (mqttClient?.IsConnected == true) |
||||
{ |
||||
try { mqttClient.DisconnectAsync().Wait(System.TimeSpan.FromSeconds(2)); } |
||||
catch { } |
||||
} |
||||
mqttClient?.Dispose(); |
||||
} |
||||
|
||||
// ── Connection ──────────────────────────────────────────────────────── |
||||
|
||||
private async Task ConnectAsync() |
||||
{ |
||||
if (mqttClient != null) |
||||
{ |
||||
mqttClient.ApplicationMessageReceivedAsync -= OnApplicationMessageReceived; |
||||
mqttClient.ConnectedAsync -= OnConnected; |
||||
mqttClient.DisconnectedAsync -= OnDisconnected; |
||||
mqttClient.Dispose(); |
||||
mqttClient = null; |
||||
} |
||||
|
||||
var factory = new MqttFactory(); |
||||
mqttClient = factory.CreateMqttClient(); |
||||
|
||||
var options = new MqttClientOptionsBuilder() |
||||
.WithTcpServer(brokerHost, brokerPort) |
||||
.WithClientId(clientId) |
||||
.WithCleanSession() |
||||
.Build(); |
||||
|
||||
mqttClient.ApplicationMessageReceivedAsync += OnApplicationMessageReceived; |
||||
mqttClient.ConnectedAsync += OnConnected; |
||||
mqttClient.DisconnectedAsync += OnDisconnected; |
||||
|
||||
try |
||||
{ |
||||
using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); |
||||
connectCts.CancelAfter(System.TimeSpan.FromSeconds(5)); |
||||
await mqttClient.ConnectAsync(options, connectCts.Token).ConfigureAwait(false); |
||||
} |
||||
catch (System.Exception e) |
||||
{ |
||||
Debug.LogError($"<b>[MQTT]</b> Connect failed: {e.Message}"); |
||||
} |
||||
} |
||||
|
||||
private async Task OnConnected(MqttClientConnectedEventArgs e) |
||||
{ |
||||
Debug.Log("<b>[MQTT]</b> Connected."); |
||||
await SubscribeAsync(topic).ConfigureAwait(false); |
||||
} |
||||
|
||||
private async Task OnDisconnected(MqttClientDisconnectedEventArgs e) |
||||
{ |
||||
Debug.LogWarning($"[MQTT] Disconnected. Reconnecting in 3s…"); |
||||
if (cts.IsCancellationRequested) return; |
||||
|
||||
try |
||||
{ |
||||
await Task.Delay(System.TimeSpan.FromSeconds(3), cts.Token) |
||||
.ConfigureAwait(false); |
||||
await ConnectAsync().ConfigureAwait(false); |
||||
} |
||||
catch (TaskCanceledException) { } |
||||
} |
||||
|
||||
// ── Subscribe / Publish ─────────────────────────────────────────────── |
||||
|
||||
private async Task SubscribeAsync(string topicFilter) |
||||
{ |
||||
var filter = new MqttTopicFilterBuilder() |
||||
.WithTopic(topicFilter) |
||||
.Build(); |
||||
|
||||
await mqttClient.SubscribeAsync(filter, cts.Token).ConfigureAwait(false); |
||||
Debug.Log($"<b>[MQTT]</b> Subscribed to: {topicFilter}"); |
||||
} |
||||
|
||||
public async Task PublishAsync(string publishTopic, string payload) |
||||
{ |
||||
if (!IsConnected) |
||||
{ |
||||
Debug.LogWarning("<b>[MQTT]</b> Publish skipped — not connected."); |
||||
return; |
||||
} |
||||
|
||||
var message = new MqttApplicationMessageBuilder() |
||||
.WithTopic(publishTopic) |
||||
.WithPayload(payload) |
||||
.Build(); |
||||
|
||||
await mqttClient.PublishAsync(message, cts.Token).ConfigureAwait(false); |
||||
} |
||||
|
||||
// ── Receive ─────────────────────────────────────────────────────────── |
||||
|
||||
private Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) |
||||
{ |
||||
var incomingTopic = e.ApplicationMessage.Topic; |
||||
var payload = e.ApplicationMessage.ConvertPayloadToString(); |
||||
// Enqueue so it's processed on the main thread in Update() |
||||
messageQueue.Enqueue((incomingTopic, payload)); |
||||
return Task.CompletedTask; |
||||
} |
||||
|
||||
// Override this in a subclass, or subscribe to OnMessage instead |
||||
protected virtual void HandleMessage(string incomingTopic, string payload) |
||||
{ |
||||
if (verbose) |
||||
{ |
||||
Debug.Log($"<b>[MQTT]</b> {incomingTopic}: {payload}"); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,2 @@ |
||||
fileFormatVersion: 2 |
||||
guid: 2d9fec57d2f43b54cb83a1310af13701 |
||||
Loading…
Reference in new issue