Merge remote-tracking branch 'origin/develop'
This commit is contained in:
commit
127427b783
|
@ -181,7 +181,7 @@ shutdown() ->
|
||||||
shutdown(normal).
|
shutdown(normal).
|
||||||
|
|
||||||
shutdown(Reason) ->
|
shutdown(Reason) ->
|
||||||
?LOG(info, "[EMQ X] emqx shutdown for ~s", [Reason]),
|
?LOG(critical, "[EMQ X] emqx shutdown for ~s", [Reason]),
|
||||||
emqx_alarm_handler:unload(),
|
emqx_alarm_handler:unload(),
|
||||||
emqx_plugins:unload(),
|
emqx_plugins:unload(),
|
||||||
lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).
|
lists:foreach(fun application:stop/1, [emqx, ekka, cowboy, ranch, esockd, gproc]).
|
||||||
|
|
|
@ -276,7 +276,8 @@ init(Config) ->
|
||||||
forwards => Topics,
|
forwards => Topics,
|
||||||
subscriptions => Subs,
|
subscriptions => Subs,
|
||||||
replayq => Queue,
|
replayq => Queue,
|
||||||
inflight => []
|
inflight => [],
|
||||||
|
connection => undefined
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
code_change(_Vsn, State, Data, _Extra) ->
|
code_change(_Vsn, State, Data, _Extra) ->
|
||||||
|
@ -370,7 +371,7 @@ connected(info, {disconnected, ConnRef, Reason},
|
||||||
true ->
|
true ->
|
||||||
?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]),
|
?LOG(info, "[Bridge] Bridge ~p diconnected~nreason=~p", [name(), Reason]),
|
||||||
{next_state, connecting,
|
{next_state, connecting,
|
||||||
State#{conn_ref := undefined, connection := undefined}};
|
State#{conn_ref => undefined, connection => undefined}};
|
||||||
false ->
|
false ->
|
||||||
keep_state_and_data
|
keep_state_and_data
|
||||||
end;
|
end;
|
||||||
|
@ -446,6 +447,9 @@ is_topic_present(Topic, Topics) ->
|
||||||
|
|
||||||
do_ensure_present(forwards, Topic, _) ->
|
do_ensure_present(forwards, Topic, _) ->
|
||||||
ok = subscribe_local_topic(Topic);
|
ok = subscribe_local_topic(Topic);
|
||||||
|
do_ensure_present(subscriptions, _Topic, #{connect_module := _ConnectModule,
|
||||||
|
connection := undefined}) ->
|
||||||
|
{error, no_connection};
|
||||||
do_ensure_present(subscriptions, {Topic, QoS},
|
do_ensure_present(subscriptions, {Topic, QoS},
|
||||||
#{connect_module := ConnectModule, connection := Conn}) ->
|
#{connect_module := ConnectModule, connection := Conn}) ->
|
||||||
case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of
|
case erlang:function_exported(ConnectModule, ensure_subscribed, 3) of
|
||||||
|
@ -458,6 +462,9 @@ do_ensure_present(subscriptions, {Topic, QoS},
|
||||||
|
|
||||||
do_ensure_absent(forwards, Topic, _) ->
|
do_ensure_absent(forwards, Topic, _) ->
|
||||||
ok = emqx_broker:unsubscribe(Topic);
|
ok = emqx_broker:unsubscribe(Topic);
|
||||||
|
do_ensure_absent(subscriptions, _Topic, #{connect_module := _ConnectModule,
|
||||||
|
connection := undefined}) ->
|
||||||
|
{error, no_connection};
|
||||||
do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule,
|
do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule,
|
||||||
connection := Conn}) ->
|
connection := Conn}) ->
|
||||||
case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of
|
case erlang:function_exported(ConnectModule, ensure_unsubscribed, 2) of
|
||||||
|
|
|
@ -84,13 +84,19 @@ stop(Ref, #{ack_collector := AckCollector, client_pid := Pid}) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
|
ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) ->
|
||||||
emqx_client:subscribe(Pid, Topic, QoS);
|
case emqx_client:subscribe(Pid, Topic, QoS) of
|
||||||
|
{ok, _, _} -> ok;
|
||||||
|
Error -> Error
|
||||||
|
end;
|
||||||
ensure_subscribed(_Conn, _Topic, _QoS) ->
|
ensure_subscribed(_Conn, _Topic, _QoS) ->
|
||||||
%% return ok for now, next re-connect should should call start with new topic added to config
|
%% return ok for now, next re-connect should should call start with new topic added to config
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
|
ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) ->
|
||||||
emqx_client:unsubscribe(Pid, Topic);
|
case emqx_client:unsubscribe(Pid, Topic) of
|
||||||
|
{ok, _, _} -> ok;
|
||||||
|
Error -> Error
|
||||||
|
end;
|
||||||
ensure_unsubscribed(_, _) ->
|
ensure_unsubscribed(_, _) ->
|
||||||
%% return ok for now, next re-connect should should call start with this topic deleted from config
|
%% return ok for now, next re-connect should should call start with this topic deleted from config
|
||||||
ok.
|
ok.
|
||||||
|
@ -188,4 +194,3 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
|
||||||
Error -> throw(Error)
|
Error -> throw(Error)
|
||||||
end
|
end
|
||||||
end, Subscriptions).
|
end, Subscriptions).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue