From b6fa4d2a3f8c8f53c795fa69237315e7b2f8db97 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Fri, 29 Mar 2019 10:19:11 +0800 Subject: [PATCH 1/2] Fix bridge subscribe bugs --- src/emqx_bridge.erl | 11 +++++++++-- src/emqx_bridge_mqtt.erl | 11 ++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 740f52b98..a15851f0f 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -276,7 +276,8 @@ init(Config) -> forwards => Topics, subscriptions => Subs, replayq => Queue, - inflight => [] + inflight => [], + connection => undefined }}. code_change(_Vsn, State, Data, _Extra) -> @@ -370,7 +371,7 @@ connected(info, {disconnected, ConnRef, Reason}, true -> ?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]), {next_state, connecting, - State#{conn_ref := undefined, connection := undefined}}; + State#{conn_ref => undefined, connection => undefined}}; false -> keep_state_and_data end; @@ -446,6 +447,9 @@ is_topic_present(Topic, Topics) -> do_ensure_present(forwards, Topic, _) -> ok = subscribe_local_topic(Topic); +do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule, + connection := undefined}) -> + {error, no_connection}; do_ensure_present(subscriptions, {Topic, QoS}, #{connect_module := ConnectModule, connection := Conn}) -> case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of @@ -458,6 +462,9 @@ do_ensure_present(subscriptions, {Topic, QoS}, do_ensure_absent(forwards, Topic, _) -> ok = emqx_broker:unsubscribe(Topic); +do_ensure_absent(subscriptions, _Topic, #{connect_module := _ConnectModule, + connection := undefined}) -> + {error, no_connection}; do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule, connection := Conn}) -> case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of diff --git a/src/emqx_bridge_mqtt.erl b/src/emqx_bridge_mqtt.erl index 10f56d7c5..870efe51e 100644 --- a/src/emqx_bridge_mqtt.erl +++ b/src/emqx_bridge_mqtt.erl @@ -84,13 +84,19 @@ stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) -> ok. ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) -> - emqx_client:subscribe(Pid, Topic, QoS); + case emqx_client:subscribe(Pid, Topic, QoS) of + {ok, _, _} -> ok; + Error -> Error + end; ensure_subscribed(_Conn, _Topic, _QoS) -> %% return ok for now, next re-connect should should call start with new topic added to config ok. ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) -> - emqx_client:unsubscribe(Pid, Topic); + case emqx_client:unsubscribe(Pid, Topic) of + {ok, _, _} -> ok; + Error -> Error + end; ensure_unsubscribed(_, _) -> %% return ok for now, next re-connect should should call start with this topic deleted from config ok. @@ -188,4 +194,3 @@ subscribe_remote_topics(ClientPid, Subscriptions) -> Error -> throw(Error) end end, Subscriptions). - From 0c64da6da7fbf03abba8e6ad233a1cbd1309be7c Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Fri, 29 Mar 2019 10:56:01 +0800 Subject: [PATCH 2/2] Change default log level from info to critical when shutdown --- src/emqx.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/emqx.erl b/src/emqx.erl index 48a65d666..ea22a31d3 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -181,7 +181,7 @@ shutdown() -> shutdown(normal). shutdown(Reason) -> - ?LOG(info, "[EMQ X] emqx shutdown for ~s", [Reason]), + ?LOG(critical, "[EMQ X] emqx shutdown for ~s", [Reason]), emqx_alarm_handler:unload(), emqx_plugins:unload(), lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]). @@ -198,4 +198,3 @@ reload_config(ConfFile) -> lists:foreach(fun({App, Vals}) -> [application:set_env(App, Par, Val) || {Par, Val} <- Vals] end, Conf). -