diff --git a/src/emqx.erl b/src/emqx.erl index 59725b52e..ea22a31d3 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -181,7 +181,7 @@ shutdown() -> shutdown(normal). shutdown(Reason) -> - ?LOG(info, "[EMQ X] emqx shutdown for ~s", [Reason]), + ?LOG(critical, "[EMQ X] emqx shutdown for ~s", [Reason]), emqx_alarm_handler:unload(), emqx_plugins:unload(), lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]). diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 740f52b98..a15851f0f 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -276,7 +276,8 @@ init(Config) -> forwards => Topics, subscriptions => Subs, replayq => Queue, - inflight => [] + inflight => [], + connection => undefined }}. code_change(_Vsn, State, Data, _Extra) -> @@ -370,7 +371,7 @@ connected(info, {disconnected, ConnRef, Reason}, true -> ?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]), {next_state, connecting, - State#{conn_ref := undefined, connection := undefined}}; + State#{conn_ref => undefined, connection => undefined}}; false -> keep_state_and_data end; @@ -446,6 +447,9 @@ is_topic_present(Topic, Topics) -> do_ensure_present(forwards, Topic, _) -> ok = subscribe_local_topic(Topic); +do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule, + connection := undefined}) -> + {error, no_connection}; do_ensure_present(subscriptions, {Topic, QoS}, #{connect_module := ConnectModule, connection := Conn}) -> case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of @@ -458,6 +462,9 @@ do_ensure_present(subscriptions, {Topic, QoS}, do_ensure_absent(forwards, Topic, _) -> ok = emqx_broker:unsubscribe(Topic); +do_ensure_absent(subscriptions, _Topic, #{connect_module := _ConnectModule, + connection := undefined}) -> + {error, no_connection}; do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule, connection := Conn}) -> case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of diff --git a/src/emqx_bridge_mqtt.erl b/src/emqx_bridge_mqtt.erl index 10f56d7c5..870efe51e 100644 --- a/src/emqx_bridge_mqtt.erl +++ b/src/emqx_bridge_mqtt.erl @@ -84,13 +84,19 @@ stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) -> ok. ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) -> - emqx_client:subscribe(Pid, Topic, QoS); + case emqx_client:subscribe(Pid, Topic, QoS) of + {ok, _, _} -> ok; + Error -> Error + end; ensure_subscribed(_Conn, _Topic, _QoS) -> %% return ok for now, next re-connect should should call start with new topic added to config ok. ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) -> - emqx_client:unsubscribe(Pid, Topic); + case emqx_client:unsubscribe(Pid, Topic) of + {ok, _, _} -> ok; + Error -> Error + end; ensure_unsubscribed(_, _) -> %% return ok for now, next re-connect should should call start with this topic deleted from config ok. @@ -188,4 +194,3 @@ subscribe_remote_topics(ClientPid, Subscriptions) -> Error -> throw(Error) end end, Subscriptions). -