Merge branch '0.14' of github.com:emqtt/emqttd into 0.14
This commit is contained in:
commit
e1e09599ef
|
@ -278,14 +278,12 @@
|
||||||
|
|
||||||
%% Erlang System Monitor
|
%% Erlang System Monitor
|
||||||
{sysmon, [
|
{sysmon, [
|
||||||
%% Log file.
|
|
||||||
{logfile, "log/emqttd_sysmon.log"},
|
|
||||||
|
|
||||||
%% Long GC
|
%% Long GC
|
||||||
{long_gc, 100},
|
{long_gc, 100},
|
||||||
|
|
||||||
%% Long Schedule(ms)
|
%% Long Schedule(ms)
|
||||||
{long_schedule, 50},
|
{long_schedule, 100},
|
||||||
|
|
||||||
%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
|
%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
|
||||||
%% 8 * 1024 * 1024
|
%% 8 * 1024 * 1024
|
||||||
|
|
|
@ -270,22 +270,20 @@
|
||||||
|
|
||||||
%% Erlang System Monitor
|
%% Erlang System Monitor
|
||||||
{sysmon, [
|
{sysmon, [
|
||||||
%% Log file.
|
|
||||||
{logfile, "log/emqttd_sysmon.log"},
|
|
||||||
|
|
||||||
%% Long GC, don't monitor in production mode for:
|
%% Long GC, don't monitor in production mode for:
|
||||||
%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
|
%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
|
||||||
{long_gc, false},
|
{long_gc, false},
|
||||||
|
|
||||||
%% Long Schedule(ms)
|
%% Long Schedule(ms)
|
||||||
{long_schedule, 100},
|
{long_schedule, 240},
|
||||||
|
|
||||||
%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
|
%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
|
||||||
%% 8 * 1024 * 1024
|
%% 8 * 1024 * 1024
|
||||||
{large_heap, 8388608},
|
{large_heap, 8388608},
|
||||||
|
|
||||||
%% Busy Port
|
%% Busy Port
|
||||||
{busy_port, true},
|
{busy_port, false},
|
||||||
|
|
||||||
%% Busy Dist Port
|
%% Busy Dist Port
|
||||||
{busy_dist_port, true}
|
{busy_dist_port, true}
|
||||||
|
|
|
@ -157,7 +157,8 @@ stop() ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([Opts]) ->
|
init([Opts]) ->
|
||||||
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected]),
|
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
|
||||||
|
|
||||||
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}),
|
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}),
|
||||||
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}),
|
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}),
|
||||||
{ok, state}.
|
{ok, state}.
|
||||||
|
|
|
@ -63,7 +63,7 @@ all_rules() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec init(AclOpts :: list()) -> {ok, State :: any()}.
|
-spec init(AclOpts :: list()) -> {ok, State :: any()}.
|
||||||
init(AclOpts) ->
|
init(AclOpts) ->
|
||||||
ets:new(?ACL_RULE_TAB, [set, public, named_table]),
|
ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]),
|
||||||
AclFile = proplists:get_value(file, AclOpts),
|
AclFile = proplists:get_value(file, AclOpts),
|
||||||
Default = proplists:get_value(nomatch, AclOpts, allow),
|
Default = proplists:get_value(nomatch, AclOpts, allow),
|
||||||
State = #state{acl_file = AclFile, nomatch = Default},
|
State = #state{acl_file = AclFile, nomatch = Default},
|
||||||
|
|
|
@ -116,10 +116,10 @@ handle_call(Req, _From, State) ->
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?UNEXPECTED_MSG(Msg, State).
|
?UNEXPECTED_MSG(Msg, State).
|
||||||
|
|
||||||
handle_info({dispatch, Msg}, State = #state{mqueue = MQ, status = down}) ->
|
handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = MQ, status = down}) ->
|
||||||
{noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}};
|
{noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}};
|
||||||
|
|
||||||
handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) ->
|
handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
|
||||||
rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]),
|
rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
|
|
|
@ -119,8 +119,7 @@ copy_table(Table) ->
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
wait_for_tables() ->
|
wait_for_tables() ->
|
||||||
%%TODO: is not right?
|
%% io:format("mnesia wait_for_tables: ~p~n", [mnesia:system_info(local_tables)]),
|
||||||
io:format("mnesia wait_for_tables: ~p~n", [mnesia:system_info(local_tables)]),
|
|
||||||
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).
|
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -211,35 +211,35 @@ cast(Msg) ->
|
||||||
-spec publish(Msg :: mqtt_message()) -> ok.
|
-spec publish(Msg :: mqtt_message()) -> ok.
|
||||||
publish(Msg = #mqtt_message{from = From}) ->
|
publish(Msg = #mqtt_message{from = From}) ->
|
||||||
trace(publish, From, Msg),
|
trace(publish, From, Msg),
|
||||||
Msg1 = #mqtt_message{topic = Topic}
|
Msg1 = #mqtt_message{topic = To}
|
||||||
= emqttd_broker:foldl_hooks('message.publish', [], Msg),
|
= emqttd_broker:foldl_hooks('message.publish', [], Msg),
|
||||||
|
|
||||||
%% Retain message first. Don't create retained topic.
|
%% Retain message first. Don't create retained topic.
|
||||||
case emqttd_retainer:retain(Msg1) of
|
case emqttd_retainer:retain(Msg1) of
|
||||||
ok ->
|
ok ->
|
||||||
%% TODO: why unset 'retain' flag?
|
%% TODO: why unset 'retain' flag?
|
||||||
publish(Topic, emqttd_message:unset_flag(Msg1));
|
publish(To, emqttd_message:unset_flag(Msg1));
|
||||||
ignore ->
|
ignore ->
|
||||||
publish(Topic, Msg1)
|
publish(To, Msg1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
publish(Topic, Msg) when is_binary(Topic) ->
|
publish(To, Msg) ->
|
||||||
lists:foreach(fun(#mqtt_topic{topic = Name, node = Node}) ->
|
lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) ->
|
||||||
case Node =:= node() of
|
case Node =:= node() of
|
||||||
true -> ?ROUTER:route(Name, Msg);
|
true -> ?ROUTER:route(Topic, Msg);
|
||||||
false -> rpc:cast(Node, ?ROUTER, route, [Name, Msg])
|
false -> rpc:cast(Node, ?ROUTER, route, [Topic, Msg])
|
||||||
end
|
end
|
||||||
end, match(Topic)).
|
end, match(To)).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Match Topic Name with Topic Filters
|
%% @doc Match Topic Name with Topic Filters
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec match(Topic :: binary()) -> [mqtt_topic()].
|
-spec match(binary()) -> [mqtt_topic()].
|
||||||
match(Topic) when is_binary(Topic) ->
|
match(To) ->
|
||||||
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]),
|
MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]),
|
||||||
%% ets:lookup for topic table will be copied.
|
%% ets:lookup for topic table will be replicated.
|
||||||
lists:append([ets:lookup(topic, Name) || Name <- MatchedTopics]).
|
lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
|
@ -252,13 +252,16 @@ init([Pool, Id, StatsFun, Opts]) ->
|
||||||
|
|
||||||
handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
|
handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
|
||||||
State = #state{statsfun = StatsFun}) ->
|
State = #state{statsfun = StatsFun}) ->
|
||||||
|
|
||||||
|
Topics = [Topic || {Topic, _Qos} <- TopicTable],
|
||||||
|
|
||||||
%% Add routes first
|
%% Add routes first
|
||||||
?ROUTER:add_routes(TopicTable, SubPid),
|
?ROUTER:add_routes(Topics, SubPid),
|
||||||
|
|
||||||
%% Add topics
|
%% Insert topic records to global topic table
|
||||||
Topics = [#mqtt_topic{topic = Topic, node = node()} || {Topic, _Qos} <- TopicTable],
|
Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- Topics],
|
||||||
|
|
||||||
case mnesia:transaction(fun add_topics/1, [Topics]) of
|
case mnesia:transaction(fun add_topics/1, [Records]) of
|
||||||
{atomic, _} ->
|
{atomic, _} ->
|
||||||
StatsFun(topic),
|
StatsFun(topic),
|
||||||
if_subscription(
|
if_subscription(
|
||||||
|
@ -268,6 +271,7 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
|
||||||
emqttd_pooler:async_submit({mnesia, async_dirty, Args}),
|
emqttd_pooler:async_submit({mnesia, async_dirty, Args}),
|
||||||
StatsFun(subscription)
|
StatsFun(subscription)
|
||||||
end),
|
end),
|
||||||
|
%% Grant all qos...
|
||||||
{reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State};
|
{reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State};
|
||||||
{aborted, Error} ->
|
{aborted, Error} ->
|
||||||
{reply, {error, Error}, State}
|
{reply, {error, Error}, State}
|
||||||
|
@ -293,12 +297,13 @@ handle_cast(Msg, State) ->
|
||||||
?UNEXPECTED_MSG(Msg, State).
|
?UNEXPECTED_MSG(Msg, State).
|
||||||
|
|
||||||
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
|
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
|
||||||
|
|
||||||
Routes = ?ROUTER:lookup_routes(DownPid),
|
Routes = ?ROUTER:lookup_routes(DownPid),
|
||||||
|
|
||||||
%% Delete all routes of the process
|
%% Delete all routes of the process
|
||||||
?ROUTER:delete_routes(DownPid),
|
?ROUTER:delete_routes(DownPid),
|
||||||
|
|
||||||
?HELPER:aging([Topic || {Topic, _Qos} <- Routes, not ?ROUTER:has_route(Topic)]),
|
?HELPER:aging([Topic || Topic <- Routes, not ?ROUTER:has_route(Topic)]),
|
||||||
|
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
|
|
|
@ -142,7 +142,7 @@ dispatch(Topic, CPid) when is_binary(Topic) ->
|
||||||
end,
|
end,
|
||||||
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained])
|
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained])
|
||||||
end,
|
end,
|
||||||
lists:foreach(fun(Msg) -> CPid ! {dispatch, Msg} end, lists:reverse(Msgs)).
|
lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
|
|
|
@ -23,11 +23,11 @@
|
||||||
%%%
|
%%%
|
||||||
%%% Route Table:
|
%%% Route Table:
|
||||||
%%%
|
%%%
|
||||||
%%% Topic -> {Pid1, Qos}, {Pid2, Qos}, ...
|
%%% Topic -> Pid1, Pid2, ...
|
||||||
%%%
|
%%%
|
||||||
%%% Reverse Route Table:
|
%%% Reverse Route Table:
|
||||||
%%%
|
%%%
|
||||||
%%% Pid -> {Topic1, Qos}, {Topic2, Qos}, ...
|
%%% Pid -> Topic1, Topic2, ...
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%
|
%%%
|
||||||
|
@ -72,17 +72,15 @@ ensure_tab(Tab, Opts) ->
|
||||||
%% @doc Add Routes.
|
%% @doc Add Routes.
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok.
|
-spec add_routes(list(binary()), pid()) -> ok.
|
||||||
add_routes(TopicTable, Pid) when is_pid(Pid) ->
|
add_routes(Topics, Pid) when is_pid(Pid) ->
|
||||||
with_stats(fun() ->
|
with_stats(fun() ->
|
||||||
case lookup_routes(Pid) of
|
case lookup_routes(Pid) of
|
||||||
[] ->
|
[] ->
|
||||||
erlang:monitor(process, Pid),
|
erlang:monitor(process, Pid),
|
||||||
insert_routes(TopicTable, Pid);
|
insert_routes(Topics, Pid);
|
||||||
TopicInEts ->
|
InEts ->
|
||||||
{NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts),
|
insert_routes(Topics -- InEts, Pid)
|
||||||
update_routes(UpdatedTopics, Pid),
|
|
||||||
insert_routes(NewTopics, Pid)
|
|
||||||
end
|
end
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
@ -90,9 +88,9 @@ add_routes(TopicTable, Pid) when is_pid(Pid) ->
|
||||||
%% @doc Lookup Routes
|
%% @doc Lookup Routes
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec lookup_routes(pid()) -> list({binary(), mqtt_qos()}).
|
-spec lookup_routes(pid()) -> list(binary()).
|
||||||
lookup_routes(Pid) when is_pid(Pid) ->
|
lookup_routes(Pid) when is_pid(Pid) ->
|
||||||
[{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)].
|
[Topic || {_, Topic} <- ets:lookup(reverse_route, Pid)].
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Has Route?
|
%% @doc Has Route?
|
||||||
|
@ -116,7 +114,7 @@ delete_routes(Topics, Pid) ->
|
||||||
-spec delete_routes(pid()) -> ok.
|
-spec delete_routes(pid()) -> ok.
|
||||||
delete_routes(Pid) when is_pid(Pid) ->
|
delete_routes(Pid) when is_pid(Pid) ->
|
||||||
with_stats(fun() ->
|
with_stats(fun() ->
|
||||||
Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup_routes(Pid)],
|
Routes = [{Topic, Pid} || Topic <- lookup_routes(Pid)],
|
||||||
ets:delete(reverse_route, Pid),
|
ets:delete(reverse_route, Pid),
|
||||||
lists:foreach(fun delete_route_only/1, Routes)
|
lists:foreach(fun delete_route_only/1, Routes)
|
||||||
end).
|
end).
|
||||||
|
@ -132,8 +130,8 @@ route(Queue = <<"$Q/", _Q>>, Msg) ->
|
||||||
emqttd_metrics:inc('messages/dropped');
|
emqttd_metrics:inc('messages/dropped');
|
||||||
Routes ->
|
Routes ->
|
||||||
Idx = crypto:rand_uniform(1, length(Routes) + 1),
|
Idx = crypto:rand_uniform(1, length(Routes) + 1),
|
||||||
{_, SubPid, SubQos} = lists:nth(Idx, Routes),
|
{_, SubPid} = lists:nth(Idx, Routes),
|
||||||
SubPid ! {dispatch, tune_qos(SubQos, Msg)}
|
dispatch(SubPid, Queue, Msg)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
route(Topic, Msg) ->
|
route(Topic, Msg) ->
|
||||||
|
@ -141,69 +139,33 @@ route(Topic, Msg) ->
|
||||||
[] ->
|
[] ->
|
||||||
emqttd_metrics:inc('messages/dropped');
|
emqttd_metrics:inc('messages/dropped');
|
||||||
Routes ->
|
Routes ->
|
||||||
lists:foreach(
|
lists:foreach(fun({_Topic, SubPid}) ->
|
||||||
fun({_Topic, SubPid, SubQos}) ->
|
dispatch(SubPid, Topic, Msg)
|
||||||
SubPid ! {dispatch, tune_qos(SubQos, Msg)}
|
end, Routes)
|
||||||
end, Routes)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
tune_qos(SubQos, Msg = #mqtt_message{qos = PubQos}) when PubQos > SubQos ->
|
dispatch(SubPid, Topic, Msg) -> SubPid ! {dispatch, Topic, Msg}.
|
||||||
Msg#mqtt_message{qos = SubQos};
|
|
||||||
tune_qos(_SubQos, Msg) ->
|
|
||||||
Msg.
|
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% Internal Functions
|
%%% Internal Functions
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
diff(TopicTable, TopicInEts) ->
|
|
||||||
diff(TopicTable, TopicInEts, [], []).
|
|
||||||
|
|
||||||
diff([], _TopicInEts, NewAcc, UpAcc) ->
|
|
||||||
{NewAcc, UpAcc};
|
|
||||||
|
|
||||||
diff([{Topic, Qos}|TopicTable], TopicInEts, NewAcc, UpAcc) ->
|
|
||||||
case lists:keyfind(Topic, 1, TopicInEts) of
|
|
||||||
{Topic, Qos} ->
|
|
||||||
diff(TopicTable, TopicInEts, NewAcc, UpAcc);
|
|
||||||
{Topic, _Qos} ->
|
|
||||||
diff(TopicTable, TopicInEts, NewAcc, [{Topic, Qos}|UpAcc]);
|
|
||||||
false ->
|
|
||||||
diff(TopicTable, TopicInEts, [{Topic, Qos}|NewAcc], UpAcc)
|
|
||||||
end.
|
|
||||||
|
|
||||||
insert_routes([], _Pid) ->
|
insert_routes([], _Pid) ->
|
||||||
ok;
|
ok;
|
||||||
insert_routes(TopicTable, Pid) ->
|
insert_routes(Topics, Pid) ->
|
||||||
{Routes, ReverseRoutes} = routes(TopicTable, Pid),
|
{Routes, ReverseRoutes} = routes(Topics, Pid),
|
||||||
ets:insert(route, Routes),
|
ets:insert(route, Routes),
|
||||||
ets:insert(reverse_route, ReverseRoutes).
|
ets:insert(reverse_route, ReverseRoutes).
|
||||||
|
|
||||||
update_routes([], _Pid) ->
|
routes(Topics, Pid) ->
|
||||||
ok;
|
lists:unzip([{{Topic, Pid}, {Pid, Topic}} || Topic <- Topics]).
|
||||||
update_routes(TopicTable, Pid) ->
|
|
||||||
{Routes, ReverseRoutes} = routes(TopicTable, Pid),
|
|
||||||
lists:foreach(fun update_route/1, Routes),
|
|
||||||
lists:foreach(fun update_reverse_route/1, ReverseRoutes).
|
|
||||||
|
|
||||||
update_route(Route = {Topic, Pid, _Qos}) ->
|
|
||||||
ets:match_delete(route, {Topic, Pid, '_'}),
|
|
||||||
ets:insert(route, Route).
|
|
||||||
|
|
||||||
update_reverse_route(RevRoute = {Pid, Topic, _Qos}) ->
|
|
||||||
ets:match_delete(reverse_route, {Pid, Topic, '_'}),
|
|
||||||
ets:insert(reverse_route, RevRoute).
|
|
||||||
|
|
||||||
routes(TopicTable, Pid) ->
|
|
||||||
F = fun(Topic, Qos) -> {{Topic, Pid, Qos}, {Pid, Topic, Qos}} end,
|
|
||||||
lists:unzip([F(Topic, Qos) || {Topic, Qos} <- TopicTable]).
|
|
||||||
|
|
||||||
delete_route({Topic, Pid}) ->
|
delete_route({Topic, Pid}) ->
|
||||||
ets:match_delete(reverse_route, {Pid, Topic, '_'}),
|
ets:delete_object(reverse_route, {Pid, Topic}),
|
||||||
ets:match_delete(route, {Topic, Pid, '_'}).
|
ets:delete_object(route, {Topic, Pid}).
|
||||||
|
|
||||||
delete_route_only({Topic, Pid}) ->
|
delete_route_only({Topic, Pid}) ->
|
||||||
ets:match_delete(route, {Topic, Pid, '_'}).
|
ets:delete_object(route, {Topic, Pid}).
|
||||||
|
|
||||||
with_stats(Fun) ->
|
with_stats(Fun) ->
|
||||||
Ok = Fun(), setstats(), Ok.
|
Ok = Fun(), setstats(), Ok.
|
||||||
|
|
|
@ -85,9 +85,8 @@
|
||||||
%% Last packet id of the session
|
%% Last packet id of the session
|
||||||
packet_id = 1,
|
packet_id = 1,
|
||||||
|
|
||||||
%%TODO: Removed??
|
|
||||||
%% Client’s subscriptions.
|
%% Client’s subscriptions.
|
||||||
subscriptions :: list(),
|
subscriptions :: dict:dict(),
|
||||||
|
|
||||||
%% Inflight qos1, qos2 messages sent to the client but unacked,
|
%% Inflight qos1, qos2 messages sent to the client but unacked,
|
||||||
%% QoS 1 and QoS 2 messages which have been sent to the Client,
|
%% QoS 1 and QoS 2 messages which have been sent to the Client,
|
||||||
|
@ -246,7 +245,7 @@ init([CleanSess, ClientId, ClientPid]) ->
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
client_pid = ClientPid,
|
client_pid = ClientPid,
|
||||||
subscriptions = [],
|
subscriptions = dict:new(),
|
||||||
inflight_queue = [],
|
inflight_queue = [],
|
||||||
max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0),
|
max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0),
|
||||||
message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
|
message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
|
||||||
|
@ -284,12 +283,12 @@ prioritise_cast(Msg, _Len, _State) ->
|
||||||
|
|
||||||
prioritise_info(Msg, _Len, _State) ->
|
prioritise_info(Msg, _Len, _State) ->
|
||||||
case Msg of
|
case Msg of
|
||||||
{'EXIT', _, _} -> 10;
|
{'EXIT', _, _} -> 10;
|
||||||
expired -> 10;
|
expired -> 10;
|
||||||
{timeout, _, _} -> 5;
|
{timeout, _, _} -> 5;
|
||||||
collect_info -> 2;
|
collect_info -> 2;
|
||||||
{dispatch, _} -> 1;
|
{dispatch, _, _} -> 1;
|
||||||
_ -> 0
|
_ -> 0
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_call(info, _From, State) ->
|
handle_call(info, _From, State) ->
|
||||||
|
@ -316,7 +315,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli
|
||||||
|
|
||||||
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
||||||
|
|
||||||
case TopicTable -- Subscriptions of
|
case TopicTable -- dict:to_list(Subscriptions) of
|
||||||
[] ->
|
[] ->
|
||||||
AckFun([Qos || {_, Qos} <- TopicTable]),
|
AckFun([Qos || {_, Qos} <- TopicTable]),
|
||||||
hibernate(Session);
|
hibernate(Session);
|
||||||
|
@ -331,21 +330,22 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli
|
||||||
?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session),
|
?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session),
|
||||||
|
|
||||||
Subscriptions1 =
|
Subscriptions1 =
|
||||||
lists:foldl(fun({Topic, Qos}, Acc) ->
|
lists:foldl(fun({Topic, Qos}, Dict) ->
|
||||||
case lists:keyfind(Topic, 1, Acc) of
|
case dict:find(Topic, Dict) of
|
||||||
{Topic, Qos} ->
|
{ok, Qos} ->
|
||||||
?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session),
|
?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session),
|
||||||
Acc;
|
Dict;
|
||||||
{Topic, OldQos} ->
|
{ok, OldQos} ->
|
||||||
?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session),
|
?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session),
|
||||||
lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
|
dict:store(Topic, Qos, Dict);
|
||||||
false ->
|
error ->
|
||||||
%%TODO: the design is ugly, rewrite later...:(
|
%%TODO: the design is ugly, rewrite later...:(
|
||||||
%% <MQTT V3.1.1>: 3.8.4
|
%% <MQTT V3.1.1>: 3.8.4
|
||||||
%% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
%% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
||||||
%% a new Subscription is created and all matching retained messages are sent.
|
%% a new Subscription is created and all matching retained messages are sent.
|
||||||
emqttd_retainer:dispatch(Topic, self()),
|
emqttd_retainer:dispatch(Topic, self()),
|
||||||
[{Topic, Qos} | Acc]
|
|
||||||
|
dict:store(Topic, Qos, Dict)
|
||||||
end
|
end
|
||||||
end, Subscriptions, TopicTable),
|
end, Subscriptions, TopicTable),
|
||||||
hibernate(Session#session{subscriptions = Subscriptions1})
|
hibernate(Session#session{subscriptions = Subscriptions1})
|
||||||
|
@ -362,13 +362,8 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
|
||||||
?LOG(info, "unsubscribe ~p", [Topics], Session),
|
?LOG(info, "unsubscribe ~p", [Topics], Session),
|
||||||
|
|
||||||
Subscriptions1 =
|
Subscriptions1 =
|
||||||
lists:foldl(fun(Topic, Acc) ->
|
lists:foldl(fun(Topic, Dict) ->
|
||||||
case lists:keyfind(Topic, 1, Acc) of
|
dict:erase(Topic, Dict)
|
||||||
{Topic, _Qos} ->
|
|
||||||
lists:keydelete(Topic, 1, Acc);
|
|
||||||
false ->
|
|
||||||
Acc
|
|
||||||
end
|
|
||||||
end, Subscriptions, Topics),
|
end, Subscriptions, Topics),
|
||||||
|
|
||||||
hibernate(Session#session{subscriptions = Subscriptions1});
|
hibernate(Session#session{subscriptions = Subscriptions1});
|
||||||
|
@ -485,28 +480,10 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp})
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?UNEXPECTED_MSG(Msg, State).
|
?UNEXPECTED_MSG(Msg, State).
|
||||||
|
|
||||||
%% Queue messages when client is offline
|
%% Dispatch Message
|
||||||
handle_info({dispatch, Msg}, Session = #session{client_pid = undefined,
|
handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
|
||||||
message_queue = Q})
|
|
||||||
when is_record(Msg, mqtt_message) ->
|
when is_record(Msg, mqtt_message) ->
|
||||||
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
|
dispatch(fixqos(Topic, Msg, Subscriptions), Session);
|
||||||
|
|
||||||
%% Dispatch qos0 message directly to client
|
|
||||||
handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
|
|
||||||
Session = #session{client_pid = ClientPid}) ->
|
|
||||||
ClientPid ! {deliver, Msg},
|
|
||||||
hibernate(Session);
|
|
||||||
|
|
||||||
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
|
|
||||||
Session = #session{message_queue = MsgQ})
|
|
||||||
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
|
|
||||||
|
|
||||||
case check_inflight(Session) of
|
|
||||||
true ->
|
|
||||||
noreply(deliver(Msg, Session));
|
|
||||||
false ->
|
|
||||||
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
|
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
|
||||||
awaiting_ack = AwaitingAck}) ->
|
awaiting_ack = AwaitingAck}) ->
|
||||||
|
@ -604,6 +581,38 @@ kick(ClientId, OldPid, Pid) ->
|
||||||
%% Clean noproc
|
%% Clean noproc
|
||||||
receive {'EXIT', OldPid, _} -> ok after 0 -> ok end.
|
receive {'EXIT', OldPid, _} -> ok after 0 -> ok end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Dispatch Messages
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% Queue message if client disconnected
|
||||||
|
dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) ->
|
||||||
|
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
|
||||||
|
|
||||||
|
%% Deliver qos0 message directly to client
|
||||||
|
dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->
|
||||||
|
ClientPid ! {deliver, Msg},
|
||||||
|
hibernate(Session);
|
||||||
|
|
||||||
|
dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})
|
||||||
|
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
|
||||||
|
case check_inflight(Session) of
|
||||||
|
true ->
|
||||||
|
noreply(deliver(Msg, Session));
|
||||||
|
false ->
|
||||||
|
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
|
||||||
|
end.
|
||||||
|
|
||||||
|
fixqos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) ->
|
||||||
|
case dict:find(Topic, Subscriptions) of
|
||||||
|
{ok, SubQos} when PubQos > SubQos ->
|
||||||
|
Msg#mqtt_message{qos = SubQos};
|
||||||
|
{ok, _SubQos} ->
|
||||||
|
Msg;
|
||||||
|
error ->
|
||||||
|
Msg
|
||||||
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Check inflight and awaiting_rel
|
%% Check inflight and awaiting_rel
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -723,7 +732,7 @@ sess_info(#session{clean_sess = CleanSess,
|
||||||
timestamp = CreatedAt}) ->
|
timestamp = CreatedAt}) ->
|
||||||
Stats = emqttd_mqueue:stats(MessageQueue),
|
Stats = emqttd_mqueue:stats(MessageQueue),
|
||||||
[{clean_sess, CleanSess},
|
[{clean_sess, CleanSess},
|
||||||
{subscriptions, Subscriptions},
|
{subscriptions, dict:to_list(Subscriptions)},
|
||||||
{max_inflight, MaxInflight},
|
{max_inflight, MaxInflight},
|
||||||
{inflight_queue, length(InflightQueue)},
|
{inflight_queue, length(InflightQueue)},
|
||||||
{message_queue, proplists:get_value(len, Stats)},
|
{message_queue, proplists:get_value(len, Stats)},
|
||||||
|
|
|
@ -60,8 +60,9 @@ start_link(Opts) ->
|
||||||
init([Opts]) ->
|
init([Opts]) ->
|
||||||
erlang:system_monitor(self(), parse_opt(Opts)),
|
erlang:system_monitor(self(), parse_opt(Opts)),
|
||||||
{ok, TRef} = timer:send_interval(timer:seconds(1), reset),
|
{ok, TRef} = timer:send_interval(timer:seconds(1), reset),
|
||||||
{ok, TraceLog} = start_tracelog(proplists:get_value(logfile, Opts)),
|
%%TODO: don't trace for performance issue.
|
||||||
{ok, #state{tickref = TRef, tracelog = TraceLog}}.
|
%%{ok, TraceLog} = start_tracelog(proplists:get_value(logfile, Opts)),
|
||||||
|
{ok, #state{tickref = TRef}}.
|
||||||
|
|
||||||
parse_opt(Opts) ->
|
parse_opt(Opts) ->
|
||||||
parse_opt(Opts, []).
|
parse_opt(Opts, []).
|
||||||
|
@ -71,6 +72,8 @@ parse_opt([{long_gc, false}|Opts], Acc) ->
|
||||||
parse_opt(Opts, Acc);
|
parse_opt(Opts, Acc);
|
||||||
parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) ->
|
parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) ->
|
||||||
parse_opt(Opts, [{long_gc, Ms}|Acc]);
|
parse_opt(Opts, [{long_gc, Ms}|Acc]);
|
||||||
|
parse_opt([{long_schedule, false}|Opts], Acc) ->
|
||||||
|
parse_opt(Opts, Acc);
|
||||||
parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) ->
|
parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) ->
|
||||||
parse_opt(Opts, [{long_schedule, Ms}|Acc]);
|
parse_opt(Opts, [{long_schedule, Ms}|Acc]);
|
||||||
parse_opt([{large_heap, Size}|Opts], Acc) when is_integer(Size) ->
|
parse_opt([{large_heap, Size}|Opts], Acc) when is_integer(Size) ->
|
||||||
|
@ -83,7 +86,7 @@ parse_opt([{busy_dist_port, true}|Opts], Acc) ->
|
||||||
parse_opt(Opts, [busy_dist_port|Acc]);
|
parse_opt(Opts, [busy_dist_port|Acc]);
|
||||||
parse_opt([{busy_dist_port, false}|Opts], Acc) ->
|
parse_opt([{busy_dist_port, false}|Opts], Acc) ->
|
||||||
parse_opt(Opts, Acc);
|
parse_opt(Opts, Acc);
|
||||||
parse_opt([{logfile, _}|Opts], Acc) ->
|
parse_opt([_Opt|Opts], Acc) ->
|
||||||
parse_opt(Opts, Acc).
|
parse_opt(Opts, Acc).
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
|
Loading…
Reference in New Issue