This commit is contained in:
Shawn 2019-01-03 13:54:31 +08:00 committed by GitHub
parent 78b3c375d8
commit 69954480bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 15 deletions

View File

@ -2043,3 +2043,5 @@ sysmon.busy_port = false
## ##
## Value: true | false ## Value: true | false
sysmon.busy_dist_port = true sysmon.busy_dist_port = true
{{ additional_configs }}

View File

@ -451,21 +451,21 @@ handle_call(Req, _From, State) ->
%% SUBSCRIBE: %% SUBSCRIBE:
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} = {ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
{[QoS|RcAcc], case maps:find(Topic, SubMap) of {[QoS|RcAcc], case maps:find(Topic, SubMap) of
{ok, SubOpts} -> {ok, SubOpts} ->
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
SubMap; SubMap;
{ok, _SubOpts} -> {ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, SubOpts), emqx_broker:set_subopts(Topic, SubOpts),
%% Why??? %% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap); maps:put(Topic, SubOpts, SubMap);
error -> error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts), emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => true}]), emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap) maps:put(Topic, SubOpts, SubMap)
end} end}
end, {[], Subscriptions}, TopicFilters), end, {[], Subscriptions}, TopicFilters),
@ -474,13 +474,13 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
%% UNSUBSCRIBE: %% UNSUBSCRIBE:
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} = {ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) ->
case maps:find(Topic, SubMap) of case maps:find(Topic, SubMap) of
{ok, SubOpts} -> {ok, SubOpts} ->
ok = emqx_broker:unsubscribe(Topic), ok = emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]),
{[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
error -> error ->
{[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
@ -654,11 +654,12 @@ handle_info(Info, State) ->
terminate(Reason, #state{will_msg = WillMsg, terminate(Reason, #state{will_msg = WillMsg,
client_id = ClientId, client_id = ClientId,
username = Username,
conn_pid = ConnPid, conn_pid = ConnPid,
old_conn_pid = OldConnPid}) -> old_conn_pid = OldConnPid}) ->
send_willmsg(WillMsg), send_willmsg(WillMsg),
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]], [maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]). emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -823,8 +824,8 @@ run_dispatch_steps([{subid, SubId}|Steps], Msg, State) ->
run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
%% Enqueue message if the client has been disconnected %% Enqueue message if the client has been disconnected
dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> dispatch(Msg, State = #state{client_id = ClientId, username = Username, conn_pid = undefined}) ->
case emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) of case emqx_hooks:run('message.dropped', [#{client_id => ClientId, username => Username}, Msg]) of
ok -> enqueue_msg(Msg, State); ok -> enqueue_msg(Msg, State);
stop -> State stop -> State
end; end;
@ -888,20 +889,20 @@ await(PacketId, Msg, State = #state{inflight = Inflight}) ->
PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight), PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight),
ensure_retry_timer(State#state{inflight = Inflight1}). ensure_retry_timer(State#state{inflight = Inflight1}).
acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> acked(puback, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, Msg}, _Ts}} -> {value, {publish, {_, Msg}, _Ts}} ->
emqx_hooks:run('message.acked', [#{client_id => ClientId}], Msg), emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none -> none ->
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId], State), ?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId], State),
State State
end; end;
acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, Msg}, _Ts}} -> {value, {publish, {_, Msg}, _Ts}} ->
emqx_hooks:run('message.acked', [#{client_id => ClientId}], Msg), emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
{value, {pubrel, PacketId, _Ts}} -> {value, {pubrel, PacketId, _Ts}} ->
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId], State), ?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId], State),

View File

@ -14,7 +14,7 @@
-module(emqx_time). -module(emqx_time).
-export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1]). -export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1, ts_from_ms/1]).
seed() -> seed() ->
rand:seed(exsplus, erlang:timestamp()). rand:seed(exsplus, erlang:timestamp()).
@ -30,3 +30,6 @@ now_ms() ->
now_ms({MegaSecs, Secs, MicroSecs}) -> now_ms({MegaSecs, Secs, MicroSecs}) ->
(MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).
ts_from_ms(Ms) ->
{Ms div 1000000, Ms rem 1000000, 0}.