diff --git a/etc/emqx.conf b/etc/emqx.conf index a47abddf7..fc37427e6 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2043,3 +2043,5 @@ sysmon.busy_port = false ## ## Value: true | false sysmon.busy_dist_port = true + +{{ additional_configs }} diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 5d5381f6e..ed33df3ce 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -451,21 +451,21 @@ handle_call(Req, _From, State) -> %% SUBSCRIBE: handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, - State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> + State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> {[QoS|RcAcc], case maps:find(Topic, SubMap) of {ok, SubOpts} -> - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]), SubMap; {ok, _SubOpts} -> emqx_broker:set_subopts(Topic, SubOpts), %% Why??? - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]), maps:put(Topic, SubOpts, SubMap); error -> emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => true}]), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]), maps:put(Topic, SubOpts, SubMap) end} end, {[], Subscriptions}, TopicFilters), @@ -474,13 +474,13 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, %% UNSUBSCRIBE: handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, - State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> + State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> case maps:find(Topic, SubMap) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(Topic), - emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), + emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]), {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; error -> {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} @@ -654,11 +654,12 @@ handle_info(Info, State) -> terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, + username = Username, conn_pid = ConnPid, old_conn_pid = OldConnPid}) -> send_willmsg(WillMsg), [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], - emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]). + emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -823,8 +824,8 @@ run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). %% Enqueue message if the client has been disconnected -dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> - case emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) of +dispatch(Msg, State = #state{client_id = ClientId, username = Username, conn_pid = undefined}) -> + case emqx_hooks:run('message.dropped', [#{client_id => ClientId, username => Username}, Msg]) of ok -> enqueue_msg(Msg, State); stop -> State end; @@ -888,20 +889,20 @@ await(PacketId, Msg, State = #state{inflight = Inflight}) -> PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight), ensure_retry_timer(State#state{inflight = Inflight1}). -acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> +acked(puback, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, {_, Msg}, _Ts}} -> - emqx_hooks:run('message.acked', [#{client_id => ClientId}], Msg), + emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> ?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId], State), State end; -acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> +acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {publish, {_, Msg}, _Ts}} -> - emqx_hooks:run('message.acked', [#{client_id => ClientId}], Msg), + emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg), State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} -> ?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId], State), diff --git a/src/emqx_time.erl b/src/emqx_time.erl index 95bfc9409..a7edf7efb 100644 --- a/src/emqx_time.erl +++ b/src/emqx_time.erl @@ -14,7 +14,7 @@ -module(emqx_time). --export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1]). +-export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1, ts_from_ms/1]). seed() -> rand:seed(exsplus, erlang:timestamp()). @@ -29,4 +29,7 @@ now_ms() -> erlang:system_time(millisecond). now_ms({MegaSecs, Secs, MicroSecs}) -> - (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). + (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). + +ts_from_ms(Ms) -> + {Ms div 1000000, Ms rem 1000000, 0}.