diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 7e78a3dac..23ddd69d4 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -57,8 +57,6 @@ -export([ to_map/1 , to_list/1 - , to_bin_key_map/1 - , to_bin_key_list/1 ]). -export([format/1]). @@ -193,24 +191,11 @@ update_expiry(Msg) -> Msg. to_map(Msg) -> maps:from_list(to_list(Msg)). -%% @doc Message to map --spec(to_bin_key_map(emqx_types:message()) -> #{binary() => any()}). -to_bin_key_map(Msg) -> - maps:from_list(to_bin_key_list(Msg)). - %% @doc Message to tuple list -spec(to_list(emqx_types:message()) -> map()). to_list(Msg) -> lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))). -%% @doc Message to tuple list --spec(to_bin_key_list(emqx_types:message()) -> map()). -to_bin_key_list(Msg) -> - lists:zipwith( - fun(Key, Val) -> - {bin(Key), bin_key_map(Val)} - end, record_info(fields, message), tl(tuple_to_list(Msg))). - %% MilliSeconds elapsed(Since) -> max(0, timer:now_diff(os:timestamp(), Since) div 1000). @@ -225,15 +210,3 @@ format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> io_lib:format("~p", [Headers]). - -bin_key_map(Map) when is_map(Map) -> - maps:fold(fun(Key, Val, Acc) -> - Acc#{bin(Key) => bin_key_map(Val)} - end, #{}, Map); -bin_key_map(Data) -> - Data. - -bin(Bin) when is_binary(Bin) -> Bin; -bin(Atom) when is_atom(Atom) -> list_to_binary(atom_to_list(Atom)); -bin(Str) when is_list(Str) -> list_to_binary(Str). - diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 594599daf..f13ccdd34 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -124,8 +124,8 @@ stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> %% @doc Enqueue a message. -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}). -in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> - {_Dropped = undefined, MQ}; +in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> + {_Dropped = Msg, MQ}; in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, p_table = PTab, q = Q, diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index ee12ed96a..af7a32f9a 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -276,7 +276,11 @@ plugin_unloaded(_Name, false) -> ok; plugin_unloaded(Name, true) -> case read_loaded() of - {ok, Names} -> + {ok, Names0} -> + Names = lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1}; + ({Name1, true}) -> {true, Name1}; + ({_Name1, false}) -> false + end, Names0), case lists:member(Name, Names) of true -> write_loaded(lists:delete(Name, Names)); @@ -306,4 +310,3 @@ write_loaded(AppNames) -> ?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]), {error, Error} end. - diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 3ef9107a7..2ef64c9aa 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -812,7 +812,11 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, %% Dispatch messages %%------------------------------------------------------------------------------ -handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap}) -> +handle_dispatch(Msgs, State = #state{inflight = Inflight, + client_id = ClientId, + username = Username, + subscriptions = SubMap}) -> + SessProps = #{client_id => ClientId, username => Username}, %% Drain the mailbox and batch deliver Msgs1 = drain_m(batch_n(Inflight), Msgs), %% Ack the messages for shared subscription @@ -823,7 +827,9 @@ handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap SubOpts = find_subopts(Topic, SubMap), case process_subopts(SubOpts, Msg, State) of {ok, Msg1} -> [Msg1|Acc]; - ignore -> Acc + ignore -> + emqx_hooks:run('message.dropped', [SessProps, Msg]), + Acc end end, [], Msgs2), NState = batch_process(Msgs3, State), @@ -957,7 +963,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use if Dropped =/= undefined -> SessProps = #{client_id => ClientId, username => Username}, - ok = emqx_hooks:run('message.dropped', [SessProps, Msg]); + ok = emqx_hooks:run('message.dropped', [SessProps, Dropped]); true -> ok end, State#state{mqueue = NewQ}. diff --git a/test/emqx_bridge_SUITE.erl b/test/emqx_bridge_SUITE.erl index 238871d05..b33a64210 100644 --- a/test/emqx_bridge_SUITE.erl +++ b/test/emqx_bridge_SUITE.erl @@ -87,6 +87,7 @@ t_rpc(Config) when is_list(Config) -> %% message from a different client, to avoid getting terminated by no-local Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>), ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]), + ct:sleep(100), PacketId = 1, emqx_session:publish(SPid, PacketId, Msg1), ?wait(case emqx_mock_client:get_last_message(ConnPid) of