Merge pull request #409 from emqtt/0.14

0.14 - subscription
This commit is contained in:
Feng Lee 2015-12-17 15:22:55 +08:00
commit d7a51fb287
11 changed files with 276 additions and 63 deletions

@ -1 +1 @@
Subproject commit 5276a19f1fb8b0868b02484e1e6974f12236c8fb
Subproject commit 86bfd890776f67b29d9df84bdf1b95ffe4702a5f

View File

@ -1,6 +1,6 @@
% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% ex: ft=erlang ts=4 sw=4 et
[{kernel,
[{kernel,
[{start_timer, true},
{start_pg2, true}
]},
@ -175,14 +175,22 @@
{modules, [
%% Client presence management module.
%% Publish messages when client connected or disconnected
{presence, [{qos, 0}]}
{presence, [{qos, 0}]},
%% Subscribe topics automatically when client connected
%% {autosub, [{"$Q/client/$c", 0}]}
{subscription, [
%% Subscription from stored table
stored,
%% $u will be replaced with username
{"$Q/username/$u", 1},
%% $c will be replaced with clientid
{"$Q/client/$c", 1}
]}
%% Rewrite rules
%% {rewrite, [{file, "etc/rewrite.config"}]}
]},
%% Plugins
{plugins, [

View File

@ -170,11 +170,19 @@
{presence, [{qos, 0}]}
%% Subscribe topics automatically when client connected
%% {autosub, [{"$Q/client/$c", 0}]}
%% {subscription, [
%% %% Subscription from stored table
%% stored,
%%
%% %% $u will be replaced with username
%% {"$Q/username/$u", 1},
%%
%% %% $c will be replaced with clientid
%% {"$Q/client/$c", 1}
%% ]}
%% Rewrite rules
%% {rewrite, [{file, "etc/rewrite.config"}]}
]},
%% Plugins
{plugins, [

View File

@ -29,6 +29,8 @@
-include("emqttd_cli.hrl").
-include("emqttd_protocol.hrl").
-import(lists, [foreach/2]).
-import(proplists, [get_value/2]).
@ -36,10 +38,8 @@
-export([load/0]).
-export([status/1, broker/1, cluster/1, bridges/1,
clients/1, sessions/1, plugins/1, listeners/1,
vm/1, mnesia/1, trace/1]).
%% TODO: topics, subscriptions...
clients/1, sessions/1, topics/1, subscriptions/1,
plugins/1, listeners/1, vm/1, mnesia/1, trace/1]).
-define(PROC_INFOKEYS, [status,
memory,
@ -49,6 +49,8 @@
stack_size,
reductions]).
-define(MAX_LINES, 20000).
-define(APP, emqttd).
load() ->
@ -70,10 +72,10 @@ status([]) ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
?PRINT("Node ~p is ~p~n", [node(), InternalStatus]),
case lists:keysearch(?APP, 1, application:which_applications()) of
false ->
?PRINT_MSG("emqttd is not running~n");
{value, {?APP, _Desc, Vsn}} ->
?PRINT("emqttd ~s is running~n", [Vsn])
false ->
?PRINT_MSG("emqttd is not running~n");
{value, {?APP, _Desc, Vsn}} ->
?PRINT("emqttd ~s is running~n", [Vsn])
end;
status(_) ->
?PRINT_CMD("status", "query broker status").
@ -101,12 +103,12 @@ broker(["metrics"]) ->
broker(["pubsub"]) ->
Pubsubs = supervisor:which_children(emqttd_pubsub_sup),
foreach(fun({{_, Id}, Pid, _, _}) ->
ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS),
?PRINT("pubsub: ~w~n", [Id]),
foreach(fun({Key, Val}) ->
?PRINT(" ~-18s: ~w~n", [Key, Val])
end, ProcInfo)
end, lists:reverse(Pubsubs));
ProcInfo = erlang:process_info(Pid, ?PROC_INFOKEYS),
?PRINT("pubsub: ~w~n", [Id]),
foreach(fun({Key, Val}) ->
?PRINT(" ~-18s: ~w~n", [Key, Val])
end, ProcInfo)
end, lists:reverse(Pubsubs));
broker(_) ->
?USAGE([{"broker", "query broker version, uptime and description"},
@ -123,7 +125,7 @@ cluster([]) ->
?PRINT("cluster nodes: ~p~n", [Nodes]);
cluster(usage) ->
?PRINT_CMD("cluster [<Node>]", "cluster with node, query cluster info ");
?PRINT_CMD("cluster [<Node>]", "cluster with node, query cluster info");
cluster([SNode]) ->
Node = emqttd_dist:parse_node(SNode),
@ -171,24 +173,22 @@ clients(["list"]) ->
emqttd_mnesia:dump(ets, mqtt_client, fun print/1);
clients(["show", ClientId]) ->
case emqttd_cm:lookup(list_to_binary(ClientId)) of
undefined -> ?PRINT_MSG("Not Found.~n");
Client -> print(Client)
end;
if_client(ClientId, fun print/1);
clients(["kick", ClientId]) ->
case emqttd_cm:lookup(list_to_binary(ClientId)) of
undefined ->
?PRINT_MSG("Not Found.~n");
#mqtt_client{client_pid = Pid} ->
emqttd_client:kick(Pid)
end;
if_client(ClientId, fun(#mqtt_client{client_pid = Pid}) -> emqttd_client:kick(Pid) end);
clients(_) ->
?USAGE([{"clients list", "list all clients"},
{"clients show <ClientId>", "show a client"},
{"clients kick <ClientId>", "kick a client"}]).
if_client(ClientId, Fun) ->
case emqttd_cm:lookup(bin(ClientId)) of
undefined -> ?PRINT_MSG("Not Found.~n");
Client -> Fun(Client)
end.
%%------------------------------------------------------------------------------
%% @doc Sessions Command
%% @end
@ -203,7 +203,7 @@ sessions(["list", "transient"]) ->
emqttd_mnesia:dump(ets, mqtt_transient_session, fun print/1);
sessions(["show", ClientId]) ->
MP = {{list_to_binary(ClientId), '_'}, '_'},
MP = {{bin(ClientId), '_'}, '_'},
case {ets:match_object(mqtt_transient_session, MP),
ets:match_object(mqtt_persistent_session, MP)} of
{[], []} ->
@ -220,6 +220,78 @@ sessions(_) ->
{"sessions list transient", "list all transient sessions"},
{"sessions show <ClientId>", "show a session"}]).
%%------------------------------------------------------------------------------
%% @doc Topics Command
%% @end
%%------------------------------------------------------------------------------
topics(["list"]) ->
Print = fun(Topic, Records) -> print(topic, Topic, Records) end,
if_could_print(topic, Print);
topics(["show", Topic]) ->
print(topic, Topic, ets:lookup(topic, bin(Topic)));
topics(_) ->
?USAGE([{"topics list", "list all topics"},
{"topics show <Topic>", "show a topic"}]).
subscriptions(["list"]) ->
Print = fun(ClientId, Records) -> print(subscription, ClientId, Records) end,
if_subscription(fun() -> if_could_print(subscription, Print) end);
subscriptions(["show", ClientId]) ->
if_subscription(fun() ->
case emqttd_pubsub:lookup(subscription, bin(ClientId)) of
[] -> ?PRINT_MSG("Not Found.~n");
Records -> print(subscription, ClientId, Records)
end
end);
subscriptions(["add", ClientId, Topic, QoS]) ->
Create = fun(IntQos) ->
Subscription = {bin(ClientId), bin(Topic), IntQos},
case emqttd_pubsub:create(subscription, Subscription) of
ok -> ?PRINT_MSG("ok~n");
{error, Error} -> ?PRINT("Error: ~p~n", [Error])
end
end,
if_subscription(fun() -> if_valid_qos(QoS, Create) end);
subscriptions(["del", ClientId, Topic]) ->
if_subscription(fun() ->
Ok = emqttd_pubsub:delete(subscription, {bin(ClientId), bin(Topic)}),
?PRINT("~p~n", [Ok])
end);
subscriptions(_) ->
?USAGE([{"subscriptions list", "list all subscriptions"},
{"subscriptions show <ClientId>", "show subscriptions of a client"},
{"subscriptions add <ClientId> <Topic> <QoS>", "add subscription"},
{"subscriptions del <ClientId> <Topic>", "delete subscription"}]).
if_subscription(Fun) ->
case ets:info(subscription, name) of
undefined -> ?PRINT_MSG("Error: subscription table not found!~n");
_ -> Fun()
end.
if_could_print(Tab, Fun) ->
case mnesia:table_info(Tab, size) of
Size when Size >= ?MAX_LINES ->
?PRINT("Could not list, too many ~ss: ~p~n", [Tab, Size]);
_Size ->
Keys = mnesia:dirty_all_keys(Tab),
foreach(fun(Key) -> Fun(Key, ets:lookup(Tab, Key)) end, Keys)
end.
if_valid_qos(QoS, Fun) ->
try list_to_integer(QoS) of
Int when ?IS_QOS(Int) -> Fun(Int);
_ -> ?PRINT_MSG("QoS should be 0, 1, 2~n")
catch _:_ ->
?PRINT_MSG("QoS should be 0, 1, 2~n")
end.
plugins(["list"]) ->
foreach(fun print/1, emqttd_plugins:list());
@ -296,9 +368,9 @@ parse_opts(Cmd, OptStr) ->
parse_opt(bridge, qos, Qos) ->
{qos, list_to_integer(Qos)};
parse_opt(bridge, suffix, Suffix) ->
{topic_suffix, list_to_binary(Suffix)};
{topic_suffix, bin(Suffix)};
parse_opt(bridge, prefix, Prefix) ->
{topic_prefix, list_to_binary(Prefix)};
{topic_prefix, bin(Prefix)};
parse_opt(bridge, queue, Len) ->
{max_queue_len, list_to_integer(Len)};
parse_opt(_Cmd, Opt, _Val) ->
@ -377,7 +449,7 @@ trace(_) ->
{"trace topic <Topic> off", "stop to trace Topic"}]).
trace_on(Who, Name, LogFile) ->
case emqttd_trace:start_trace({Who, list_to_binary(Name)}, LogFile) of
case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of
ok ->
?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
@ -385,7 +457,7 @@ trace_on(Who, Name, LogFile) ->
end.
trace_off(Who, Name) ->
case emqttd_trace:stop_trace({Who, list_to_binary(Name)}) of
case emqttd_trace:stop_trace({Who, bin(Name)}) of
ok ->
?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
@ -423,6 +495,9 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess,
emqttd_net:format(Peername),
emqttd_util:now_to_secs(ConnectedAt)]);
print(#mqtt_topic{topic = Topic, node = Node}) ->
?PRINT("~s on ~s~n", [Topic, Node]);
print({{ClientId, _ClientPid}, SessInfo}) ->
InfoKeys = [clean_sess,
max_inflight,
@ -440,6 +515,14 @@ print({{ClientId, _ClientPid}, SessInfo}) ->
"created_at=~w, subscriptions=~s)~n",
[ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]).
print(topic, Topic, Records) ->
Nodes = [Node || #mqtt_topic{node = Node} <- Records],
?PRINT("~s: on ~p~n", [Topic, Nodes]);
print(subscription, ClientId, Subscriptions) ->
TopicTable = [{Topic, Qos} || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
?PRINT("~s: ~p~n", [ClientId, TopicTable]).
format(created_at, Val) ->
emqttd_util:now_to_secs(Val);
@ -449,3 +532,6 @@ format(subscriptions, List) ->
format(_, Val) ->
Val.
bin(S) when is_list(S) -> list_to_binary(S);
bin(B) when is_binary(B) -> B.

View File

@ -19,11 +19,11 @@
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc emqttd auto subscribe module.
%%% @doc Subscription from Broker Side
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(emqttd_mod_autosub).
-module(emqttd_mod_subscription).
-behaviour(emqttd_gen_mod).
@ -33,29 +33,45 @@
-export([load/1, client_connected/3, unload/1]).
-record(state, {topics}).
-record(state, {topics, stored = false}).
-ifdef(TEST).
-compile(export_all).
-endif.
load(Opts) ->
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
Topics = [{bin(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)],
State = #state{topics = Topics, stored = lists:member(stored, Opts)},
emqttd_broker:hook('client.connected', {?MODULE, client_connected},
{?MODULE, client_connected, [Topics]}),
{ok, #state{topics = Topics}}.
{?MODULE, client_connected, [State]}),
{ok, State}.
client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId,
client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId,
client_pid = ClientPid,
username = Username}, Topics) ->
F = fun(Topic) ->
Topic1 = emqttd_topic:feed_var(<<"$c">>, ClientId, Topic),
if
Username =:= undefined -> Topic1;
true -> emqttd_topic:feed_var(<<"$u">>, Username, Topic1)
end
end,
emqttd_client:subscribe(ClientPid, [{F(Topic), Qos} || {Topic, Qos} <- Topics]);
username = Username},
#state{topics = Topics, stored = Stored}) ->
Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end,
TopicTable = with_stored(Stored, ClientId, [{Replace(Topic), Qos} || {Topic, Qos} <- Topics]),
emqttd_client:subscribe(ClientPid, TopicTable).
client_connected(_ConnAck, _Client, _Topics) ->
ignore.
with_stored(false, _ClientId, TopicTable) ->
TopicTable;
with_stored(true, ClientId, TopicTable) ->
Fun = fun(#mqtt_subscription{topic = Topic, qos = Qos}) -> {Topic, Qos} end,
emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_pubsub:lookup(subscription, ClientId)], TopicTable).
unload(_Opts) ->
emqttd_broker:unhook('client.connected', {?MODULE, client_connected}).
rep(<<"$c">>, ClientId, Topic) ->
emqttd_topic:feed_var(<<"$c">>, ClientId, Topic);
rep(<<"$u">>, undefined, Topic) ->
Topic;
rep(<<"$u">>, Username, Topic) ->
emqttd_topic:feed_var(<<"$u">>, Username, Topic).
bin(B) when is_binary(B) ->
B;
bin(S) when is_list(S) ->
list_to_binary(S).

View File

@ -39,11 +39,13 @@
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% API Exports
%% API Exports
-export([start_link/4]).
-export([create/2, subscribe/1, subscribe/2,
unsubscribe/1, unsubscribe/2, publish/1]).
-export([create/2, lookup/2, subscribe/1, subscribe/2,
unsubscribe/1, unsubscribe/2, publish/1, delete/2]).
%% Subscriptions API
%% Local node
-export([match/1]).
@ -148,12 +150,36 @@ create(topic, Topic) when is_binary(Topic) ->
{aborted, Error} -> {error, Error}
end;
create(subscription, {SubId, Topic, Qos}) ->
create(subscription, {SubId, Topic, Qos}) when is_binary(SubId) andalso is_binary(Topic) ->
case mnesia:transaction(fun add_subscription/2, [SubId, {Topic, Qos}]) of
{atomic, ok} -> ok;
{aborted, Error} -> {error, Error}
end.
%%------------------------------------------------------------------------------
%% @doc Lookup Topic or Subscription.
%% @end
%%------------------------------------------------------------------------------
-spec lookup(topic | subscription, binary()) -> list().
lookup(topic, Topic) ->
mnesia:dirty_read(topic, Topic);
lookup(subscription, ClientId) ->
mnesia:dirty_read(subscription, ClientId).
%%------------------------------------------------------------------------------
%% @doc Delete Topic or Subscription.
%% @end
%%------------------------------------------------------------------------------
delete(topic, _Topic) ->
{error, unsupported};
delete(subscription, ClientId) when is_binary(ClientId) ->
mnesia:dirty_deleate({subscription, ClientId});
delete(subscription, {ClientId, Topic}) when is_binary(ClientId) ->
mnesia:async_dirty(fun remove_subscriptions/2, [ClientId, [Topic]]).
%%------------------------------------------------------------------------------
%% @doc Subscribe Topics
%% @end
@ -363,7 +389,7 @@ remove_subscriptions(SubId, Topics) ->
lists:foreach(fun(Topic) ->
Pattern = #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'},
Records = mnesia:match_object(subscription, Pattern, write),
[delete_subscription(Record) || Record <- Records]
lists:foreach(fun delete_subscription/1, Records)
end, Topics).
delete_subscription(Record) ->

View File

@ -68,6 +68,12 @@ ensure_tab(Tab, Opts) ->
ok
end.
-ifdef(TEST).
destory() ->
ets:delete(route),
ets:delete(reverse_route).
-endif.
%%------------------------------------------------------------------------------
%% @doc Add Routes.
%% @end

View File

@ -483,7 +483,7 @@ handle_cast(Msg, State) ->
%% Dispatch Message
handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
when is_record(Msg, mqtt_message) ->
dispatch(fixqos(Topic, Msg, Subscriptions), Session);
dispatch(tune_qos(Topic, Msg, Subscriptions), Session);
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
awaiting_ack = AwaitingAck}) ->
@ -603,7 +603,7 @@ dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
end.
fixqos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) ->
tune_qos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) ->
case dict:find(Topic, Subscriptions) of
{ok, SubQos} when PubQos > SubQos ->
Msg#mqtt_message{qos = SubQos};

View File

@ -31,7 +31,7 @@
-define(SERVER, ?MODULE).
-export([start_link/0]).
-export([start_link/0, stop/0]).
%% statistics API.
-export([statsfun/1, statsfun/2,
@ -88,6 +88,9 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
stop() ->
gen_server:call(?SERVER, stop).
%%------------------------------------------------------------------------------
%% @doc Generate stats fun
%% @end
@ -149,6 +152,9 @@ init([]) ->
% Tick to publish stats
{ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(_Request, _From, State) ->
{reply, error, State}.

View File

@ -0,0 +1,18 @@
-module(emqttd_mod_subscription_tests).
-include("emqttd.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-define(M, emqttd_mod_subscription).
rep_test() ->
?assertEqual(<<"topic/clientId">>, ?M:rep(<<"$c">>, <<"clientId">>, <<"topic/$c">>)),
?assertEqual(<<"topic/username">>, ?M:rep(<<"$u">>, <<"username">>, <<"topic/$u">>)),
?assertEqual(<<"topic/username/clientId">>,
?M:rep(<<"$c">>, <<"clientId">>,
?M:rep(<<"$u">>, <<"username">>, <<"topic/$u/$c">>))).
-endif.

View File

@ -0,0 +1,39 @@
-module(emqttd_router_tests).
-include("emqttd.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-define(ROUTER, emqttd_router).
route_test_() ->
{setup,
fun() -> ?ROUTER:init([]) end,
fun(_) -> ?ROUTER:destory() end,
[?_test(t_add_routes()),
?_test(t_delete_routes()),
?_test(t_has_route()),
?_test(t_route())
]}.
t_add_routes() ->
Pid = self(),
ok.
%?ROUTER:add_routes([<<"a">>, <<"b">>], Pid),
%?assertEqual([{<<"a">>, Pid}, {<<"b">>, Pid}], lists:sort(ets:tab2list(route))),
%?assertEqual([{Pid, <<"a">>}, {Pid, <<"b">>}], lists:sort(ets:tab2list(reverse_route))).
t_delete_routes() ->
ok.
t_has_route() ->
ok.
t_route() ->
ok.
-endif.