Merge remote-tracking branch 'origin/develop'

This commit is contained in:
zhanghongtong 2019-04-04 11:05:42 +08:00
commit 2164c9149b
5 changed files with 17 additions and 34 deletions

View File

@ -57,8 +57,6 @@
-export([ to_map/1
, to_list/1
, to_bin_key_map/1
, to_bin_key_list/1
]).
-export([format/1]).
@ -193,24 +191,11 @@ update_expiry(Msg) -> Msg.
to_map(Msg) ->
maps:from_list(to_list(Msg)).
%% @doc Message to map
-spec(to_bin_key_map(emqx_types:message()) -> #{binary() => any()}).
to_bin_key_map(Msg) ->
maps:from_list(to_bin_key_list(Msg)).
%% @doc Message to tuple list
-spec(to_list(emqx_types:message()) -> map()).
to_list(Msg) ->
lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))).
%% @doc Message to tuple list
-spec(to_bin_key_list(emqx_types:message()) -> map()).
to_bin_key_list(Msg) ->
lists:zipwith(
fun(Key, Val) ->
{bin(Key), bin_key_map(Val)}
end, record_info(fields, message), tl(tuple_to_list(Msg))).
%% MilliSeconds
elapsed(Since) ->
max(0, timer:now_diff(os:timestamp(), Since) div 1000).
@ -225,15 +210,3 @@ format(flags, Flags) ->
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
format(headers, Headers) ->
io_lib:format("~p", [Headers]).
bin_key_map(Map) when is_map(Map) ->
maps:fold(fun(Key, Val, Acc) ->
Acc#{bin(Key) => bin_key_map(Val)}
end, #{}, Map);
bin_key_map(Data) ->
Data.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Atom) when is_atom(Atom) -> list_to_binary(atom_to_list(Atom));
bin(Str) when is_list(Str) -> list_to_binary(Str).

View File

@ -124,8 +124,8 @@ stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
%% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
{_Dropped = undefined, MQ};
in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
{_Dropped = Msg, MQ};
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
p_table = PTab,
q = Q,

View File

@ -276,7 +276,11 @@ plugin_unloaded(_Name, false) ->
ok;
plugin_unloaded(Name, true) ->
case read_loaded() of
{ok, Names} ->
{ok, Names0} ->
Names = lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1};
({Name1, true}) -> {true, Name1};
({_Name1, false}) -> false
end, Names0),
case lists:member(Name, Names) of
true ->
write_loaded(lists:delete(Name, Names));
@ -306,4 +310,3 @@ write_loaded(AppNames) ->
?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]),
{error, Error}
end.

View File

@ -812,7 +812,11 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel,
%% Dispatch messages
%%------------------------------------------------------------------------------
handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap}) ->
handle_dispatch(Msgs, State = #state{inflight = Inflight,
client_id = ClientId,
username = Username,
subscriptions = SubMap}) ->
SessProps = #{client_id => ClientId, username => Username},
%% Drain the mailbox and batch deliver
Msgs1 = drain_m(batch_n(Inflight), Msgs),
%% Ack the messages for shared subscription
@ -823,7 +827,9 @@ handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap
SubOpts = find_subopts(Topic, SubMap),
case process_subopts(SubOpts, Msg, State) of
{ok, Msg1} -> [Msg1|Acc];
ignore -> Acc
ignore ->
emqx_hooks:run('message.dropped', [SessProps, Msg]),
Acc
end
end, [], Msgs2),
NState = batch_process(Msgs3, State),
@ -957,7 +963,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use
if
Dropped =/= undefined ->
SessProps = #{client_id => ClientId, username => Username},
ok = emqx_hooks:run('message.dropped', [SessProps, Msg]);
ok = emqx_hooks:run('message.dropped', [SessProps, Dropped]);
true -> ok
end,
State#state{mqueue = NewQ}.

View File

@ -87,6 +87,7 @@ t_rpc(Config) when is_list(Config) ->
%% message from a different client, to avoid getting terminated by no-local
Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>),
ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]),
ct:sleep(100),
PacketId = 1,
emqx_session:publish(SPid, PacketId, Msg1),
?wait(case emqx_mock_client:get_last_message(ConnPid) of