diff --git a/src/emqx_message.erl b/src/emqx_message.erl index d4f490ed7..23ddd69d4 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -16,12 +16,24 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-include("types.hrl"). +%% Create -export([ make/2 , make/3 , make/4 ]). +%% Fields +-export([ id/1 + , qos/1 + , from/1 + , topic/1 + , payload/1 + , timestamp/1 + ]). + +%% Flags -export([ get_flag/2 , get_flag/3 , set_flag/2 @@ -30,6 +42,7 @@ , unset_flag/2 ]). +%% Headers -export([ get_headers/1 , get_header/2 , get_header/3 @@ -44,8 +57,6 @@ -export([ to_map/1 , to_list/1 - , to_bin_key_map/1 - , to_bin_key_list/1 ]). -export([format/1]). @@ -56,14 +67,17 @@ make(Topic, Payload) -> make(undefined, Topic, Payload). --spec(make(atom() | emqx_types:client_id(), emqx_topic:topic(), emqx_types:payload()) - -> emqx_types:message()). +-spec(make(atom() | emqx_types:client_id(), + emqx_topic:topic(), + emqx_types:payload()) -> emqx_types:message()). make(From, Topic, Payload) -> make(From, ?QOS_0, Topic, Payload). --spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(), - emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). -make(From, QoS, Topic, Payload) -> +-spec(make(atom() | emqx_types:client_id(), + emqx_mqtt_types:qos(), + emqx_topic:topic(), + emqx_types:payload()) -> emqx_types:message()). +make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> #message{id = emqx_guid:gen(), qos = QoS, from = From, @@ -72,6 +86,24 @@ make(From, QoS, Topic, Payload) -> payload = Payload, timestamp = os:timestamp()}. +-spec(id(emqx_types:message()) -> maybe(binary())). +id(#message{id = Id}) -> Id. + +-spec(qos(emqx_types:message()) -> emqx_mqtt_types:qos()). +qos(#message{qos = QoS}) -> QoS. + +-spec(from(emqx_types:message()) -> atom() | binary()). +from(#message{from = From}) -> From. + +-spec(topic(emqx_types:message()) -> emqx_types:topic()). +topic(#message{topic = Topic}) -> Topic. + +-spec(payload(emqx_types:message()) -> emqx_types:payload()). +payload(#message{payload = Payload}) -> Payload. + +-spec(timestamp(emqx_types:message()) -> erlang:timestamp()). +timestamp(#message{timestamp = TS}) -> TS. + -spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()). set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) -> Msg#message{flags = Flags}; @@ -159,24 +191,11 @@ update_expiry(Msg) -> Msg. to_map(Msg) -> maps:from_list(to_list(Msg)). -%% @doc Message to map --spec(to_bin_key_map(emqx_types:message()) -> #{binary() => any()}). -to_bin_key_map(Msg) -> - maps:from_list(to_bin_key_list(Msg)). - %% @doc Message to tuple list -spec(to_list(emqx_types:message()) -> map()). to_list(Msg) -> lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))). -%% @doc Message to tuple list --spec(to_bin_key_list(emqx_types:message()) -> map()). -to_bin_key_list(Msg) -> - lists:zipwith( - fun(Key, Val) -> - {bin(Key), bin_key_map(Val)} - end, record_info(fields, message), tl(tuple_to_list(Msg))). - %% MilliSeconds elapsed(Since) -> max(0, timer:now_diff(os:timestamp(), Since) div 1000). @@ -191,14 +210,3 @@ format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> io_lib:format("~p", [Headers]). - -bin_key_map(Map) when is_map(Map) -> - maps:fold(fun(Key, Val, Acc) -> - Acc#{bin(Key) => bin_key_map(Val)} - end, #{}, Map); -bin_key_map(Data) -> - Data. - -bin(Bin) when is_binary(Bin) -> Bin; -bin(Atom) when is_atom(Atom) -> list_to_binary(atom_to_list(Atom)); -bin(Str) when is_list(Str) -> list_to_binary(Str). diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 594599daf..f13ccdd34 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -124,8 +124,8 @@ stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> %% @doc Enqueue a message. -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}). -in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> - {_Dropped = undefined, MQ}; +in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> + {_Dropped = Msg, MQ}; in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, p_table = PTab, q = Q, diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 3e5b0ec0c..ba28bddac 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -140,7 +140,7 @@ publish_props(Headers) -> %% @doc Message from Packet -spec(to_message(emqx_types:credentials(), emqx_mqtt_types:packet()) -> emqx_types:message()). -to_message(#{client_id := ClientId, username := Username}, +to_message(#{client_id := ClientId, username := Username, peername := Peername}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, retain = Retain, qos = QoS, @@ -150,7 +150,8 @@ to_message(#{client_id := ClientId, username := Username}, payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg#message{flags = #{dup => Dup, retain => Retain}, - headers = merge_props(#{username => Username}, Props)}. + headers = merge_props(#{username => Username, + peername => Peername}, Props)}. -spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()). will_msg(#mqtt_packet_connect{will_flag = false}) -> diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index ee12ed96a..af7a32f9a 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -276,7 +276,11 @@ plugin_unloaded(_Name, false) -> ok; plugin_unloaded(Name, true) -> case read_loaded() of - {ok, Names} -> + {ok, Names0} -> + Names = lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1}; + ({Name1, true}) -> {true, Name1}; + ({_Name1, false}) -> false + end, Names0), case lists:member(Name, Names) of true -> write_loaded(lists:delete(Name, Names)); @@ -306,4 +310,3 @@ write_loaded(AppNames) -> ?LOG(error, "[Plugins] Open File ~p Error: ~p", [File, Error]), {error, Error} end. - diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 3ef9107a7..2ef64c9aa 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -812,7 +812,11 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, %% Dispatch messages %%------------------------------------------------------------------------------ -handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap}) -> +handle_dispatch(Msgs, State = #state{inflight = Inflight, + client_id = ClientId, + username = Username, + subscriptions = SubMap}) -> + SessProps = #{client_id => ClientId, username => Username}, %% Drain the mailbox and batch deliver Msgs1 = drain_m(batch_n(Inflight), Msgs), %% Ack the messages for shared subscription @@ -823,7 +827,9 @@ handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap SubOpts = find_subopts(Topic, SubMap), case process_subopts(SubOpts, Msg, State) of {ok, Msg1} -> [Msg1|Acc]; - ignore -> Acc + ignore -> + emqx_hooks:run('message.dropped', [SessProps, Msg]), + Acc end end, [], Msgs2), NState = batch_process(Msgs3, State), @@ -957,7 +963,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use if Dropped =/= undefined -> SessProps = #{client_id => ClientId, username => Username}, - ok = emqx_hooks:run('message.dropped', [SessProps, Msg]); + ok = emqx_hooks:run('message.dropped', [SessProps, Dropped]); true -> ok end, State#state{mqueue = NewQ}. diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 926db5b04..bec56f142 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -228,7 +228,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update {noreply, start_timer(State#state{updates = Updates1}), hibernate}; handle_info(Info, State) -> - ?LOG("error, [Stats] Unexpected info: ~p", [Info]), + ?LOG(error, "[Stats] Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index e5b718b05..21dfd58df 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -172,7 +172,7 @@ uptime(days, D) -> publish(uptime, Uptime) -> safe_publish(systop(uptime), Uptime); publish(datetime, Datetime) -> - safe_publish(systop(datatype), Datetime); + safe_publish(systop(datetime), Datetime); publish(version, Version) -> safe_publish(systop(version), #{retain => true}, Version); publish(sysdescr, Descr) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index e3094e11c..92ae6a7dd 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -113,7 +113,7 @@ call(WSPid, Req) when is_pid(WSPid) -> %%------------------------------------------------------------------------------ init(Req, Opts) -> - IdleTimeout = proplists:get_value(idle_timeout, Opts, 60000), + IdleTimeout = proplists:get_value(idle_timeout, Opts, 7200000), DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])), MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of 0 -> infinity; diff --git a/test/emqx_bridge_SUITE.erl b/test/emqx_bridge_SUITE.erl index 238871d05..b33a64210 100644 --- a/test/emqx_bridge_SUITE.erl +++ b/test/emqx_bridge_SUITE.erl @@ -87,6 +87,7 @@ t_rpc(Config) when is_list(Config) -> %% message from a different client, to avoid getting terminated by no-local Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>), ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]), + ct:sleep(100), PacketId = 1, emqx_session:publish(SPid, PacketId, Msg1), ?wait(case emqx_mock_client:get_last_message(ConnPid) of diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 74f6702ce..79e34bfb5 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -14,34 +14,38 @@ -module(emqx_message_SUITE). --compile(export_all). --compile(nowarn_export_all). - -include("emqx.hrl"). - -include("emqx_mqtt.hrl"). - -include_lib("eunit/include/eunit.hrl"). -all() -> - [ message_make - , message_flag - , message_header - , message_format - , message_expired - , message_to_map - ]. +-export([ t_make/1 + , t_flag/1 + , t_header/1 + , t_format/1 + , t_expired/1 + , t_to_map/1 + ]). -message_make(_) -> - Msg = emqx_message:make(<<"clientid">>, <<"payload">>), - ?assertEqual(0, Msg#message.qos), +-export([ all/0 + , suite/0 + ]). + +t_make(_) -> + Msg = emqx_message:make(<<"topic">>, <<"payload">>), + ?assertEqual(0, emqx_message:qos(Msg)), + ?assertEqual(undefined, emqx_message:from(Msg)), + ?assertEqual(<<"payload">>, emqx_message:payload(Msg)), Msg1 = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - ?assertEqual(0, Msg1#message.qos), + ?assertEqual(0, emqx_message:qos(Msg1)), + ?assertEqual(<<"topic">>, emqx_message:topic(Msg1)), Msg2 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>), - ?assert(is_binary(Msg2#message.id)), - ?assertEqual(2, Msg2#message.qos). + ?assert(is_binary(emqx_message:id(Msg2))), + ?assertEqual(2, emqx_message:qos(Msg2)), + ?assertEqual(<<"clientid">>, emqx_message:from(Msg2)), + ?assertEqual(<<"topic">>, emqx_message:topic(Msg2)), + ?assertEqual(<<"payload">>, emqx_message:payload(Msg2)). -message_flag(_) -> +t_flag(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), Msg2 = emqx_message:set_flag(retain, false, Msg), Msg3 = emqx_message:set_flag(dup, Msg2), @@ -55,7 +59,7 @@ message_flag(_) -> ?assert(emqx_message:get_flag(dup, Msg6)), ?assert(emqx_message:get_flag(retain, Msg6)). -message_header(_) -> +t_header(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg), Msg2 = emqx_message:set_header(c, 3, Msg1), @@ -64,10 +68,10 @@ message_header(_) -> Msg3 = emqx_message:remove_header(a, Msg2), ?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg3)). -message_format(_) -> +t_format(_) -> io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]). -message_expired(_) -> +t_expired(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg), timer:sleep(500), @@ -78,7 +82,7 @@ message_expired(_) -> Msg2 = emqx_message:update_expiry(Msg1), ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). -message_to_map(_) -> +t_to_map(_) -> Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>), List = [{id, Msg#message.id}, {qos, ?QOS_1}, @@ -91,3 +95,10 @@ message_to_map(_) -> ?assertEqual(List, emqx_message:to_list(Msg)), ?assertEqual(maps:from_list(List), emqx_message:to_map(Msg)). +all() -> + IsTestCase = fun("t_" ++ _) -> true; (_) -> false end, + [F || {F, _A} <- module_info(exports), IsTestCase(atom_to_list(F))]. + +suite() -> + [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. + diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index 4cb87bd18..07fb6c58d 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -100,8 +100,13 @@ packet_message(_) -> Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), Msg2 = emqx_message:set_flag(retain, false, Msg), Pkt = emqx_packet:from_message(10, Msg2), - Msg3 = emqx_message:set_header(username, "test", Msg2), - Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>, username => "test"}, Pkt), + Msg3 = emqx_message:set_header( + peername, {{127,0,0,1}, 9527}, + emqx_message:set_header(username, "test", Msg2) + ), + Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>, + username => "test", + peername => {{127,0,0,1}, 9527}}, Pkt), Msg5 = Msg4#message{timestamp = Msg3#message.timestamp, id = Msg3#message.id}, Msg5 = Msg3.