From 8f35d13e17d362074a9032f89e68ac651f68acb4 Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Sat, 22 Sep 2018 16:19:28 +0800 Subject: [PATCH] Improve bridges design (#1849) Improve the design of bridges --- etc/emqx.conf | 284 +++++++++++++++++++++------------------- priv/emqx.schema | 18 +-- src/emqx_bridge.erl | 194 +++++++++++++++++---------- src/emqx_bridge_sup.erl | 2 +- src/emqx_client.erl | 2 +- 5 files changed, 284 insertions(+), 216 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 635ccf68c..dc039c199 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1557,241 +1557,257 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ##-------------------------------------------------------------------- ##-------------------------------------------------------------------- -## Bridges to edge +## Bridges to aws ##-------------------------------------------------------------------- -## Bridge type. -## -## Value: Enum -## Example: out | in -bridge.edge.type = in - -## Bridge address: node name for local bridge, host:port for remote. -## -## Value: String -## Example: emqx@127.0.0.1, 127.0.0.1:1883 -bridge.edge.address = 127.0.0.1:1883 - -## Protocol version of the bridge. -## -## Value: Enum -## - mqtt5 -## - mqtt4 -## - mqtt3 -bridge.edge.proto_ver = mqtt4 - -## The ClientId of a remote bridge. -## -## Value: String -bridge.edge.client_id = bridge_edge - -## The Clean start flag of a remote bridge. -## -## Value: boolean -bridge.edge.clean_start = false - -## The username for a remote bridge. -## -## Value: String -bridge.edge.username = user - -## The password for a remote bridge. -## -## Value: String -bridge.edge.password = passwd - -## Mountpoint of the bridge. -## -## Value: String -## bridge.edge.mountpoint = bridge/edge/ - -## Ping interval of a down bridge. -## -## Value: Duration -## Default: 10 seconds -bridge.edge.keepalive = 10s - -## Subscriptions of the bridge topic. -## -## Value: String -bridge.edge.subscription.1.topic = # - -## Subscriptions of the bridge qos. -## -## Value: Number -bridge.edge.subscription.1.qos = 1 - -## The pending message queue of a bridge. -## -## Value: Number -bridge.edge.max_pending_messages = 10000 - ## Start type of the bridge. ## ## Value: enum ## manual ## auto -bridge.edge.start_type = manual - -## Bridge reconnect count. -## -## Value: Number -bridge.edge.reconnect_count = 10 +bridge.aws.start_type = manual ## Bridge reconnect time. ## ## Value: Duration ## Default: 30 seconds -bridge.edge.reconnect_time = 30s - -## PEM-encoded CA certificates of the bridge. -## -## Value: File -## bridge.edge.cacertfile = cacert.pem - -## SSL Certfile of the bridge. -## -## Value: File -## bridge.edge.certfile = cert.pem - -## SSL Keyfile of the bridge. -## -## Value: File -## bridge.edge.keyfile = key.pem - -## SSL Ciphers used by the bridge. -## -## Value: String -## bridge.edge.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 - -## TLS versions used by the bridge. -## -## Value: String -## bridge.edge.tls_versions = tlsv1.2,tlsv1.1,tlsv1 - - -##-------------------------------------------------------------------- -## Bridges to cloud -##-------------------------------------------------------------------- -## Bridge type. -## -## Value: Enum -## Example: out | in -bridge.cloud.type = out +bridge.aws.reconnect_interval = 30s ## Bridge address: node name for local bridge, host:port for remote. ## ## Value: String ## Example: emqx@127.0.0.1, 127.0.0.1:1883 -bridge.cloud.address = 127.0.0.1:1883 +bridge.aws.address = 127.0.0.1:1883 ## Protocol version of the bridge. ## ## Value: Enum -## - mqtt5 -## - mqtt4 -## - mqtt3 -bridge.cloud.proto_ver = mqtt4 +## - mqttv5 +## - mqttv4 +## - mqttv3 +bridge.aws.proto_ver = mqttv4 ## The ClientId of a remote bridge. ## ## Value: String -bridge.cloud.client_id = bridge_cloud +bridge.aws.client_id = bridge_aws ## The Clean start flag of a remote bridge. ## ## Value: boolean -bridge.cloud.clean_start = false +bridge.aws.clean_start = false ## The username for a remote bridge. ## ## Value: String -bridge.cloud.username = user +bridge.aws.username = user ## The password for a remote bridge. ## ## Value: String -bridge.cloud.password = passwd +bridge.aws.password = passwd ## Mountpoint of the bridge. ## ## Value: String -bridge.cloud.mountpoint = bridge/edge/${node}/ +bridge.aws.mountpoint = bridge/aws/${node}/ ## Ping interval of a down bridge. ## ## Value: Duration ## Default: 10 seconds -bridge.cloud.keepalive = 10s +bridge.aws.keepalive = 60s ## Forward message topics ## ## Value: String ## Example: topic1/#,topic2/# -bridge.cloud.forward_rule = # +bridge.aws.forwards = topic1/#,topic2/# ## Subscriptions of the bridge topic. ## ## Value: String -bridge.cloud.subscription.1.topic = $share/cmd/topic1 +bridge.aws.subscription.1.topic = cmd/topic1 ## Subscriptions of the bridge qos. ## ## Value: Number -bridge.cloud.subscription.1.qos = 1 +bridge.aws.subscription.1.qos = 1 -## Bridge store message type. +## Subscriptions of the bridge topic. +## +## Value: String +bridge.aws.subscription.2.topic = cmd/topic2 + +## Subscriptions of the bridge qos. +## +## Value: Number +bridge.aws.subscription.2.qos = 1 + +## Bridge message queue message type. ## ## Value: Enum ## Example: memory | disk -bridge.cloud.store_type = memory +bridge.aws.mqueue_type = memory ## The pending message queue of a bridge. ## ## Value: Number -bridge.cloud.max_pending_messages = 10000 +bridge.aws.max_pending_messages = 10000 + +## PEM-encoded CA certificates of the bridge. +## +## Value: File +## bridge.aws.cacertfile = cacert.pem + +## SSL Certfile of the bridge. +## +## Value: File +## bridge.aws.certfile = cert.pem + +## SSL Keyfile of the bridge. +## +## Value: File +## bridge.aws.keyfile = key.pem + +## SSL Ciphers used by the bridge. +## +## Value: String +## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 + +## TLS versions used by the bridge. +## +## Value: String +## bridge.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1 + +##-------------------------------------------------------------------- +## Bridges to azure +##-------------------------------------------------------------------- ## Start type of the bridge. ## ## Value: enum ## manual ## auto -bridge.cloud.start_type = manual +## bridge.azure.start_type = manual ## Bridge reconnect count. ## ## Value: Number -bridge.cloud.reconnect_count = 10 +## bridge.azure.reconnect_count = 10 ## Bridge reconnect time. ## ## Value: Duration ## Default: 30 seconds -bridge.cloud.reconnect_time = 30s +## bridge.azure.reconnect_time = 30s + +## Bridge address: node name for local bridge, host:port for remote. +## +## Value: String +## Example: emqx@127.0.0.1, 127.0.0.1:1883 +## bridge.azure.address = 127.0.0.1:1883 + +## Protocol version of the bridge. +## +## Value: Enum +## - mqttv5 +## - mqttv4 +## - mqttv3 +## bridge.azure.proto_ver = mqttv4 + +## The ClientId of a remote bridge. +## +## Value: String +## bridge.azure.client_id = bridge_azure + +## The Clean start flag of a remote bridge. +## +## Value: boolean +## bridge.azure.clean_start = false + +## The username for a remote bridge. +## +## Value: String +## bridge.azure.username = user + +## The password for a remote bridge. +## +## Value: String +## bridge.azure.password = passwd + +## Mountpoint of the bridge. +## +## Value: String +## bridge.azure.mountpoint = bridge/azure/${node}/ + +## Ping interval of a down bridge. +## +## Value: Duration +## Default: 10 seconds +## bridge.azure.keepalive = 10s + +## Forward message topics +## +## Value: String +## Example: topic1/#,topic2/# +## bridge.azure.forwards = topic1/#,topic2/# + +## Subscriptions of the bridge topic. +## +## Value: String +## bridge.azure.subscription.1.topic = $share/cmd/topic1 + +## Subscriptions of the bridge qos. +## +## Value: Number +## bridge.azure.subscription.1.qos = 1 + +## Subscriptions of the bridge topic. +## +## Value: String +## bridge.azure.subscription.2.topic = $share/cmd/topic2 + +## Subscriptions of the bridge qos. +## +## Value: Number +## bridge.azure.subscription.2.qos = 1 + +## Bridge store message type. +## +## Value: Enum +## Example: memory | disk +## bridge.azure.store_type = memory + +## The pending message queue of a bridge. +## +## Value: Number +## bridge.azure.max_pending_messages = 10000 + ## PEM-encoded CA certificates of the bridge. ## ## Value: File -## bridge.cloud.cacertfile = cacert.pem +## bridge.azure.cacertfile = cacert.pem ## SSL Certfile of the bridge. ## ## Value: File -## bridge.cloud.certfile = cert.pem +## bridge.azure.certfile = cert.pem ## SSL Keyfile of the bridge. ## ## Value: File -## bridge.cloud.keyfile = key.pem +## bridge.azure.keyfile = key.pem ## SSL Ciphers used by the bridge. ## ## Value: String -## bridge.cloud.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 ## TLS versions used by the bridge. ## ## Value: String -## bridge.cloud.tls_versions = tlsv1.2,tlsv1.1,tlsv1 +## bridge.azure.tls_versions = tlsv1.2,tlsv1.1,tlsv1 ##-------------------------------------------------------------------- ## Modules diff --git a/priv/emqx.schema b/priv/emqx.schema index 5f148ac96..c5afd9b43 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1492,12 +1492,7 @@ end}. %%-------------------------------------------------------------------- %% Bridges %%-------------------------------------------------------------------- - -{mapping, "bridge.$name.type", "emqx.bridges", [ - {datatype, {enum, [in, out]}} -]}. - -{mapping, "bridge.$name.store_type", "emqx.bridges", [ +{mapping, "bridge.$name.mqueue_type", "emqx.bridges", [ {datatype, {enum, [memory, disk]}} ]}. @@ -1506,7 +1501,7 @@ end}. ]}. {mapping, "bridge.$name.proto_ver", "emqx.bridges", [ - {datatype, {enum, [mqtt3, mqtt4, mqtt5]}} + {datatype, {enum, [mqttv3, mqttv4, mqttv5]}} ]}. {mapping, "bridge.$name.client_id", "emqx.bridges", [ @@ -1530,7 +1525,7 @@ end}. {datatype, string} ]}. -{mapping, "bridge.$name.forward_rule", "emqx.bridges", [ +{mapping, "bridge.$name.forwards", "emqx.bridges", [ {datatype, string} ]}. @@ -1577,12 +1572,7 @@ end}. {default, auto} ]}. -{mapping, "bridge.$name.reconnect_count", "emqx.bridges", [ - {default, 10}, - {datatype, integer} -]}. - -{mapping, "bridge.$name.reconnect_time", "emqx.bridges", [ +{mapping, "bridge.$name.reconnect_interval", "emqx.bridges", [ {default, "30s"}, {datatype, {duration, s}} ]}. diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index a7ce2581d..461564bd2 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -23,12 +23,16 @@ -export([start_link/2, start_bridge/1, stop_bridge/1, status/1]). +-export([show_forwards/1, add_forward/2, del_forward/2]). + +-export([show_subscriptions/1, add_subscription/3, del_subscription/2]). + -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {client_pid, options, reconnect_time, reconnect_count, - def_reconnect_count, type, mountpoint, queue, store_type, - max_pending_messages}). +-record(state, {client_pid, options, reconnect_interval, + mountpoint, queue, mqueue_type, max_pending_messages, + forwards = [], subscriptions = []}). -record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, packet_id, topic, props, payload}). @@ -42,6 +46,50 @@ start_bridge(Name) -> stop_bridge(Name) -> gen_server:call(name(Name), stop_bridge). +-spec(show_forwards(atom()) -> list()). +show_forwards(Name) -> + gen_server:call(name(Name), show_forwards). + +-spec(add_forward(atom(), binary()) -> ok | {error, already_exists | validate_fail}). +add_forward(Name, Topic) -> + case catch emqx_topic:validate({filter, Topic}) of + true -> + gen_server:call(name(Name), {add_forward, Topic}); + {'EXIT', _Reason} -> + {error, validate_fail} + end. + +-spec(del_forward(atom(), binary()) -> ok | {error, validate_fail}). +del_forward(Name, Topic) -> + case catch emqx_topic:validate({filter, Topic}) of + true -> + gen_server:call(name(Name), {del_forward, Topic}); + _ -> + {error, validate_fail} + end. + +-spec(show_subscriptions(atom()) -> list()). +show_subscriptions(Name) -> + gen_server:call(name(Name), show_subscriptions). + +-spec(add_subscription(atom(), binary(), integer()) -> ok | {error, already_exists | validate_fail}). +add_subscription(Name, Topic, QoS) -> + case catch emqx_topic:validate({filter, Topic}) of + true -> + gen_server:call(name(Name), {add_subscription, Topic, QoS}); + {'EXIT', _Reason} -> + {error, validate_fail} + end. + +-spec(del_subscription(atom(), binary()) -> ok | {error, validate_fail}). +del_subscription(Name, Topic) -> + case catch emqx_topic:validate({filter, Topic}) of + true -> + gen_server:call(name(Name), {del_subscription, Topic}); + _ -> + {error, validate_fail} + end. + status(Pid) -> gen_server:call(Pid, status). @@ -55,41 +103,78 @@ init([Options]) -> manual -> ok; auto -> erlang:send_after(1000, self(), start) end, - ReconnectCount = get_value(reconnect_count, Options, 10), - ReconnectTime = get_value(reconnect_time, Options, 30000), + ReconnectInterval = get_value(reconnect_interval, Options, 30000), MaxPendingMsg = get_value(max_pending_messages, Options, 10000), Mountpoint = format_mountpoint(get_value(mountpoint, Options)), - StoreType = get_value(store_type, Options, memory), - Type = get_value(type, Options, in), + MqueueType = get_value(mqueue_type, Options, memory), Queue = [], - {ok, #state{type = Type, - mountpoint = Mountpoint, - queue = Queue, - store_type = StoreType, - options = Options, - reconnect_count = ReconnectCount, - reconnect_time = ReconnectTime, - def_reconnect_count = ReconnectCount, + {ok, #state{mountpoint = Mountpoint, + queue = Queue, + mqueue_type = MqueueType, + options = Options, + reconnect_interval = ReconnectInterval, max_pending_messages = MaxPendingMsg}}. handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> {noreply, NewState} = handle_info(start, State), - {reply, <<"start bridge successfully">>, NewState}; + {reply, #{msg => <<"start bridge successfully">>}, NewState}; handle_call(start_bridge, _From, State) -> - {reply, <<"bridge already started">>, State}; + {reply, #{msg => <<"bridge already started">>}, State}; handle_call(stop_bridge, _From, State = #state{client_pid = undefined}) -> - {reply, <<"bridge not started">>, State}; + {reply, #{msg => <<"bridge not started">>}, State}; handle_call(stop_bridge, _From, State = #state{client_pid = Pid}) -> emqx_client:disconnect(Pid), - {reply, <<"stop bridge successfully">>, State}; + {reply, #{msg => <<"stop bridge successfully">>}, State}; handle_call(status, _From, State = #state{client_pid = undefined}) -> - {reply, <<"Stopped">>, State}; + {reply, #{status => <<"Stopped">>}, State}; handle_call(status, _From, State = #state{client_pid = _Pid})-> - {reply, <<"Running">>, State}; + {reply, #{status => <<"Running">>}, State}; + +handle_call(show_forwards, _From, State = #state{forwards = Forwards}) -> + {reply, Forwards, State}; + +handle_call({add_forward, Topic}, _From, State = #state{forwards = Forwards}) -> + case not lists:member(Topic, Forwards) of + true -> + emqx_broker:subscribe(Topic), + {reply, ok, State#state{forwards = [Topic | Forwards]}}; + false -> + {reply, {error, already_exists}, State} + end; + +handle_call({del_forward, Topic}, _From, State = #state{forwards = Forwards}) -> + case lists:member(Topic, Forwards) of + true -> + emqx_broker:unsubscribe(Topic), + {reply, ok, State#state{forwards = lists:delete(Topic, Forwards)}}; + false -> + {reply, ok, State} + end; + +handle_call(show_subscriptions, _From, State = #state{subscriptions = Subscriptions}) -> + {reply, Subscriptions, State}; + +handle_call({add_subscription, Topic, Qos}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) -> + case not lists:keymember(Topic, 1, Subscriptions) of + true -> + emqx_client:subscribe(ClientPid, {Topic, Qos}), + {reply, ok, State#state{subscriptions = [{Topic, Qos} | Subscriptions]}}; + false -> + {reply, {error, already_exists}, State} + end; + +handle_call({del_subscription, Topic}, _From, State = #state{subscriptions = Subscriptions, client_pid = ClientPid}) -> + case lists:keymember(Topic, 1, Subscriptions) of + true -> + emqx_client:unsubscribe(ClientPid, Topic), + {reply, ok, State#state{subscriptions = lists:keydelete(Topic, 1, Subscriptions)}}; + false -> + {reply, ok, State} + end; handle_call(Req, _From, State) -> emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), @@ -99,46 +184,24 @@ handle_cast(Msg, State) -> emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info(start, State = #state{reconnect_count = 0}) -> - {noreply, State}; - %%---------------------------------------------------------------- -%% start in message bridge +%% start message bridge %%---------------------------------------------------------------- handle_info(start, State = #state{options = Options, client_pid = undefined, - reconnect_time = ReconnectTime, - reconnect_count = ReconnectCount, - type = in}) -> + reconnect_interval = ReconnectInterval}) -> case emqx_client:start_link([{owner, self()}|options(Options)]) of {ok, ClientPid, _} -> - Subs = get_value(subscriptions, Options, []), - [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], - {noreply, State#state{client_pid = ClientPid}}; + Subs = [{i2b(Topic), Qos} || {Topic, Qos} <- get_value(subscriptions, Options, []), + emqx_topic:validate({filter, i2b(Topic)})], + Forwards = [i2b(Topic) || Topic <- string:tokens(get_value(forwards, Options, ""), ","), + emqx_topic:validate({filter, i2b(Topic)})], + [emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs], + [emqx_broker:subscribe(Topic) || Topic <- Forwards], + {noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}}; {error,_} -> - erlang:send_after(ReconnectTime, self(), start), - {noreply, State#state{reconnect_count = ReconnectCount-1}} - end; - -%%---------------------------------------------------------------- -%% start out message bridge -%%---------------------------------------------------------------- -handle_info(start, State = #state{options = Options, - client_pid = undefined, - reconnect_time = ReconnectTime, - reconnect_count = ReconnectCount, - type = out}) -> - case emqx_client:start_link([{owner, self()}|options(Options)]) of - {ok, ClientPid, _} -> - Subs = get_value(subscriptions, Options, []), - [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], - ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","), - [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, - emqx_topic:validate({filter, i2b(Topic)})], - {noreply, State#state{client_pid = ClientPid}}; - {error,_} -> - erlang:send_after(ReconnectTime, self(), start), - {noreply, State#state{reconnect_count = ReconnectCount-1}} + erlang:send_after(ReconnectInterval, self(), start), + {noreply, State} end; %%---------------------------------------------------------------- @@ -146,14 +209,14 @@ handle_info(start, State = #state{options = Options, %%---------------------------------------------------------------- handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue, - store_type = StoreType, max_pending_messages = MaxPendingMsg}) -> + mqueue_type = MqueueType, max_pending_messages = MaxPendingMsg}) -> Msg = #mqtt_msg{qos = 1, retain = Retain, topic = mountpoint(Mountpoint, Topic), payload = Payload}, case emqx_client:publish(Pid, Msg) of {ok, PkgId} -> - {noreply, State#state{queue = store(StoreType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; + {noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; {error, Reason} -> emqx_logger:error("Publish fail:~p", [Reason]), {noreply, State} @@ -165,26 +228,25 @@ handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{r handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic, properties := Props, payload := Payload}}, State) -> NewMsg0 = emqx_message:make(bridge, QoS, Topic, Payload), - NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain=> Retain}, NewMsg0)), + NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, NewMsg0)), emqx_broker:publish(NewMsg1), {noreply, State}; %%---------------------------------------------------------------- %% received remote puback message %%---------------------------------------------------------------- -handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, store_type = StoreType}) -> +handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueue_type = MqueueType}) -> % lists:keydelete(PkgId, 1, Queue) - {noreply, State#state{queue = delete(StoreType, PkgId, Queue)}}; + {noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}}; handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> {noreply, State#state{client_pid = undefined}}; handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, - reconnect_time = ReconnectTime, - def_reconnect_count = DefReconnectCount}) -> + reconnect_interval = ReconnectInterval}) -> lager:warning("emqx bridge stop reason:~p", [Reason]), - erlang:send_after(ReconnectTime, self(), start), - {noreply, State#state{client_pid = undefined, reconnect_count = DefReconnectCount}}; + erlang:send_after(ReconnectInterval, self(), start), + {noreply, State#state{client_pid = undefined}}; handle_info(Info, State) -> emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), @@ -196,9 +258,9 @@ terminate(_Reason, #state{}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -proto_ver(mqtt3) -> v3; -proto_ver(mqtt4) -> v4; -proto_ver(mqtt5) -> v5. +proto_ver(mqttv3) -> v3; +proto_ver(mqttv4) -> v4; +proto_ver(mqttv5) -> v5. address(Address) -> case string:tokens(Address, ":") of [Host] -> {Host, 1883}; diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index bc8c0a532..3911da2a6 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -27,7 +27,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc List all bridges --spec(bridges() -> [{node(), Status :: binary()}]). +-spec(bridges() -> [{node(), map()}]). bridges() -> [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 85d6ca59d..7ef9c5968 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -787,7 +787,7 @@ connected(cast, ?SUBACK_PACKET(PacketId, Properties, ReasonCodes), connected(cast, ?UNSUBACK_PACKET(PacketId, Properties, ReasonCodes), State = #state{subscriptions = Subscriptions}) -> case take_call({unsubscribe, PacketId}, State) of - {value, #call{from = From, req = {_, Topics}}, NewState} -> + {value, #call{from = From, req = {_, _, Topics}}, NewState} -> Subscriptions1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc)