EMQ X R3.0 - Improve the design of pubsub and router

This commit is contained in:
Feng Lee 2018-03-21 16:48:52 +08:00
parent f218c6a35d
commit 282e341433
81 changed files with 1457 additions and 1168 deletions

View File

@ -36,13 +36,41 @@
-define(SHARE, <<"$share/">>). %% Shared Topic
%%--------------------------------------------------------------------
%% Client and Session
%%--------------------------------------------------------------------
-type(topic() :: binary()).
-type(subscriber() :: pid() | binary() | {binary(), pid()}).
-type(suboption() :: {qos, non_neg_integer()} | {share, {'$queue' | binary()}}).
-type(client_id() :: binary() | atom()).
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()).
-type(client() :: #{zone := atom(),
node := atom(),
clientid := client_id(),
protocol := protocol(),
connector => atom(),
peername => {inet:ip_address(), inet:port_number()},
username => binary(),
atom() => term()}).
-type(session() :: #{client_id := client_id(),
clean_start := boolean(),
expiry_interval := non_neg_integer()}).
%%--------------------------------------------------------------------
%% Message and Delivery
%%--------------------------------------------------------------------
-type(message_id() :: binary() | undefined).
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()).
%% -type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()).
-type(message_from() :: #{zone := atom(),
node := atom(),
@ -148,7 +176,7 @@
%% Route
%%--------------------------------------------------------------------
-record(route, { topic :: binary(), node :: node() }).
-record(route, { topic :: binary(), dest :: {binary(), node()} | node() }).
-type(route() :: #route{}).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -32,8 +32,11 @@
-export([subscribe/1, subscribe/2, subscribe/3, publish/1,
unsubscribe/1, unsubscribe/2]).
%% PubSub Management API
-export([setqos/3, topics/0, subscriptions/1, subscribers/1, subscribed/2]).
%% PubSub management API
-export([topics/0, subscriptions/1, subscribers/1, subscribed/2]).
%% Get/Set suboptions
-export([getopts/2, setopts/3]).
%% Hooks API
-export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]).
@ -46,21 +49,13 @@
-type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}).
-type(subid() :: binary()).
-type(subscriber() :: pid() | subid() | {subid(), pid()}).
-type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}).
-export_type([subscriber/0, suboption/0]).
-define(APP, ?MODULE).
%%--------------------------------------------------------------------
%% Bootstrap, environment, configuration, is_running...
%%--------------------------------------------------------------------
%% @doc Start emqx application.
%% @doc Start emqx application
-spec(start() -> ok | {error, term()}).
start() -> application:start(?APP).
@ -68,7 +63,7 @@ start() -> application:start(?APP).
-spec(stop() -> ok | {error, term()}).
stop() -> application:stop(?APP).
%% @doc Get Environment
%% @doc Get environment
-spec(env(Key :: atom()) -> {ok, any()} | undefined).
env(Key) -> application:get_env(?APP, Key).
@ -76,7 +71,7 @@ env(Key) -> application:get_env(?APP, Key).
-spec(env(Key :: atom(), Default :: any()) -> undefined | any()).
env(Key, Default) -> application:get_env(?APP, Key, Default).
%% @doc Is running?
%% @doc Is emqx running?
-spec(is_running(node()) -> boolean()).
is_running(Node) ->
case rpc:call(Node, erlang, whereis, [?APP]) of
@ -110,12 +105,9 @@ start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws ->
start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
{ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}).
% start_listener({Proto, ListenOn, Opts}) when Proto == api ->
% {ok, _} = mochiweb:start_http('mqtt:api', ListenOn, Opts, emqx_http:http_handler()).
start_listener(Proto, ListenOn, Opts) ->
Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])),
MFArgs = {emqx_client, start_link, [Env]},
MFArgs = {emqx_connection, start_link, [Env]},
{ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs).
listeners() ->
@ -169,58 +161,71 @@ merge_sockopts(Options) ->
emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]).
%%--------------------------------------------------------------------
%% PubSub APIs
%% PubSub API
%%--------------------------------------------------------------------
%% @doc Subscribe
-spec(subscribe(iodata()) -> ok | {error, term()}).
-spec(subscribe(topic() | string()) -> ok | {error, term()}).
subscribe(Topic) ->
emqx_server:subscribe(iolist_to_binary(Topic)).
emqx_broker:subscribe(iolist_to_binary(Topic)).
-spec(subscribe(iodata(), subscriber()) -> ok | {error, term()}).
-spec(subscribe(topic() | iodata(), subscriber() | string()) -> ok | {error, term()}).
subscribe(Topic, Subscriber) ->
emqx_server:subscribe(iolist_to_binary(Topic), Subscriber).
emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Subscriber)).
-spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | {error, term()}).
-spec(subscribe(topic() | iodata(), subscriber() | string(), [suboption()]) -> ok | {error, term()}).
subscribe(Topic, Subscriber, Options) ->
emqx_server:subscribe(iolist_to_binary(Topic), Subscriber, Options).
emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Subscriber), Options).
%% @doc Publish MQTT Message
-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
%% @doc Publish Message
-spec(publish(message()) -> {ok, delivery()} | ignore).
publish(Msg) ->
emqx_server:publish(Msg).
emqx_broker:publish(Msg).
%% @doc Unsubscribe
-spec(unsubscribe(iodata()) -> ok | {error, term()}).
-spec(unsubscribe(topic() | string()) -> ok | {error, term()}).
unsubscribe(Topic) ->
emqx_server:unsubscribe(iolist_to_binary(Topic)).
emqx_broker:unsubscribe(iolist_to_binary(Topic)).
-spec(unsubscribe(iodata(), subscriber()) -> ok | {error, term()}).
-spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}).
unsubscribe(Topic, Subscriber) ->
emqx_server:unsubscribe(iolist_to_binary(Topic), Subscriber).
emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Subscriber)).
%%--------------------------------------------------------------------
%% PubSub Management API
%% PubSub management API
%%--------------------------------------------------------------------
-spec(setqos(binary(), subscriber(), mqtt_qos()) -> ok).
setqos(Topic, Subscriber, Qos) ->
emqx_server:setqos(iolist_to_binary(Topic), Subscriber, Qos).
-spec(getopts(topic() | string(), subscriber()) -> [suboption()]).
getopts(Topic, Subscriber) ->
emqx_broker:getopts(iolist_to_binary(Topic), list_to_subid(Subscriber)).
-spec(topics() -> [binary()]).
-spec(setopts(topic() | string(), subscriber(), [suboption()]) -> ok).
setopts(Topic, Subscriber, Options) when is_list(Options) ->
emqx_broker:setopts(iolist_to_binary(Topic), list_to_subid(Subscriber), Options).
-spec(topics() -> list(topic())).
topics() -> emqx_router:topics().
-spec(subscribers(iodata()) -> list(subscriber())).
-spec(subscribers(topic() | string()) -> list(subscriber())).
subscribers(Topic) ->
emqx_server:subscribers(iolist_to_binary(Topic)).
emqx_broker:subscribers(iolist_to_binary(Topic)).
-spec(subscriptions(subscriber()) -> [{subscriber(), binary(), list(suboption())}]).
-spec(subscriptions(subscriber() | string()) -> [{topic(), list(suboption())}]).
subscriptions(Subscriber) ->
emqx_server:subscriptions(Subscriber).
emqx_broker:subscriptions(Subscriber).
-spec(subscribed(iodata(), subscriber()) -> boolean()).
-spec(subscribed(topic() | string(), subscriber()) -> boolean()).
subscribed(Topic, Subscriber) ->
emqx_server:subscribed(iolist_to_binary(Topic), Subscriber).
emqx_broker:subscribed(iolist_to_binary(Topic), list_to_subid(Subscriber)).
list_to_subid(SubId) when is_binary(SubId) ->
SubId;
list_to_subid(SubId) when is_list(SubId) ->
iolist_to_binary(SubId);
list_to_subid(SubPid) when is_pid(SubPid) ->
SubPid;
list_to_subid({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
{SubId, SubPid};
list_to_subid({SubId, SubPid}) when is_list(SubId), is_pid(SubPid) ->
{iolist_to_binary(SubId), SubPid}.
%%--------------------------------------------------------------------
%% Hooks API
@ -257,7 +262,7 @@ shutdown() ->
shutdown(normal).
shutdown(Reason) ->
lager:error("EMQ shutdown for ~s", [Reason]),
emqx_log:error("EMQ shutdown for ~s", [Reason]),
emqx_plugins:unload(),
lists:foreach(fun application:stop/1, [emqx, ekka, mochiweb, esockd, gproc]).
@ -268,5 +273,5 @@ reboot() ->
%% Debug
%%--------------------------------------------------------------------
dump() -> lists:append([emqx_server:dump(), emqx_router:dump()]).
dump() -> lists:append([emqx_broker:dump(), emqx_router:dump()]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -25,8 +25,6 @@
-type(access() :: subscribe | publish | pubsub).
-type(topic() :: binary()).
-type(rule() :: {allow, all} |
{allow, who(), access(), list(topic())} |
{deny, all} |
@ -42,7 +40,7 @@
compile({A, all}) when ?ALLOW_DENY(A) ->
{A, all};
compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A) andalso is_binary(Topic) ->
compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A), is_binary(Topic) ->
{A, compile(who, Who), Access, [compile(topic, Topic)]};
compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) ->
@ -70,7 +68,7 @@ compile(topic, {eq, Topic}) ->
compile(topic, Topic) ->
Words = emqx_topic:words(bin(Topic)),
case 'pattern?'(Words) of
true -> {pattern, Words};
true -> {pattern, Words};
false -> Words
end.
@ -83,13 +81,14 @@ bin(L) when is_list(L) ->
bin(B) when is_binary(B) ->
B.
%% @doc Match Access Rule
%% @doc Match access rule
-spec(match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch).
match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
{matched, AllowDeny};
match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters})
when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
case match_who(Client, Who) andalso match_topics(Client, Topic, TopicFilters) of
case match_who(Client, Who)
andalso match_topics(Client, Topic, TopicFilters) of
true -> {matched, AllowDeny};
false -> nomatch
end.
@ -123,15 +122,11 @@ match_topics(_Client, _Topic, []) ->
false;
match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) ->
TopicFilter = feed_var(Client, PatternFilter),
case match_topic(emqx_topic:words(Topic), TopicFilter) of
true -> true;
false -> match_topics(Client, Topic, Filters)
end;
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(Client, Topic, Filters);
match_topics(Client, Topic, [TopicFilter|Filters]) ->
case match_topic(emqx_topic:words(Topic), TopicFilter) of
true -> true;
false -> match_topics(Client, Topic, Filters)
end.
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(Client, Topic, Filters).
match_topic(Topic, {eq, TopicFilter}) ->
Topic =:= TopicFilter;

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -70,7 +70,8 @@ init([Pool, Id, Node, Topic, Options]) ->
true ->
true = erlang:monitor_node(Node, true),
Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
emqx_server:subscribe(Topic, self(), [local, {share, Share}, {qos, ?QOS_0}]),
%% TODO:: local???
emqx_broker:subscribe(Topic, self(), [local, {share, Share}, {qos, ?QOS_0}]),
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
MQueue = emqx_mqueue:new(qname(Node, Topic),
[{max_len, State#state.max_queue_len}],

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -20,183 +20,328 @@
-include("emqx.hrl").
-include("emqx_internal.hrl").
-export([start_link/2]).
%% API Function Exports
-export([start_link/0]).
-export([subscribe/1, subscribe/2, subscribe/3, unsubscribe/1, unsubscribe/2]).
%% Event API
-export([subscribe/1, notify/2]).
-export([publish/1, publish/2]).
%% Broker API
-export([version/0, uptime/0, datetime/0, sysdescr/0, info/0]).
-export([subscriptions/1, subscribers/1, subscribed/2]).
%% Tick API
-export([start_tick/1, stop_tick/1]).
-export([topics/0]).
-export([getopts/2, setopts/3]).
-export([dump/0]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {started_at, sys_interval, heartbeat, ticker, version, sysdescr}).
-record(state, {pool, id, subids :: map(), submon :: emqx_pmon:pmon()}).
-define(APP, emqx).
-define(BROKER, ?MODULE).
-define(SERVER, ?MODULE).
-define(BROKER_TAB, mqtt_broker).
%% $SYS Topics of Broker
-define(SYSTOP_BROKERS, [
version, % Broker version
uptime, % Broker uptime
datetime, % Broker local datetime
sysdescr % Broker description
]).
-define(TIMEOUT, 120000).
%%--------------------------------------------------------------------
%% API
%% Start a broker
%%--------------------------------------------------------------------
%% @doc Start the broker
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id) ->
gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
%% @doc Subscribe broker event
-spec(subscribe(EventType :: any()) -> ok).
subscribe(EventType) ->
gproc:reg({p, l, {broker, EventType}}).
%%--------------------------------------------------------------------
%% Sub/Unsub
%%--------------------------------------------------------------------
%% @doc Notify broker event
-spec(notify(EventType :: any(), Event :: any()) -> ok).
notify(EventType, Event) ->
gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}).
-spec(subscribe(topic()) -> ok | {error, term()}).
subscribe(Topic) when is_binary(Topic) ->
subscribe(Topic, self()).
%% @doc Get broker info
-spec(info() -> list(tuple())).
info() ->
[{version, version()},
{sysdescr, sysdescr()},
{uptime, uptime()},
{datetime, datetime()}].
-spec(subscribe(topic(), subscriber()) -> ok | {error, term()}).
subscribe(Topic, Subscriber) when is_binary(Topic) ->
subscribe(Topic, Subscriber, []).
%% @doc Get broker version
-spec(version() -> string()).
version() ->
{ok, Version} = application:get_key(?APP, vsn), Version.
-spec(subscribe(topic(), subscriber(), [suboption()]) -> ok | {error, term()}).
subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
subscribe(Topic, Subscriber, Options, ?TIMEOUT).
%% @doc Get broker description
-spec(sysdescr() -> string()).
sysdescr() ->
{ok, Descr} = application:get_key(?APP, description), Descr.
-spec(subscribe(topic(), subscriber(), [suboption()], timeout())
-> ok | {error, term()}).
subscribe(Topic, Subscriber, Options, Timeout) ->
{Topic1, Options1} = emqx_topic:parse(Topic, Options),
SubReq = {subscribe, Topic1, with_subpid(Subscriber), Options1},
async_call(pick(Subscriber), SubReq, Timeout).
%% @doc Get broker uptime
-spec(uptime() -> string()).
uptime() -> gen_server:call(?SERVER, uptime).
-spec(unsubscribe(topic()) -> ok | {error, term()}).
unsubscribe(Topic) when is_binary(Topic) ->
unsubscribe(Topic, self()).
%% @doc Get broker datetime
-spec(datetime() -> string()).
datetime() ->
{{Y, M, D}, {H, MM, S}} = calendar:local_time(),
lists:flatten(
io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
-spec(unsubscribe(topic(), subscriber()) -> ok | {error, term()}).
unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
unsubscribe(Topic, Subscriber, ?TIMEOUT).
%% @doc Start a tick timer.
start_tick(Msg) ->
start_tick(emqx:env(broker_sys_interval, 60000), Msg).
-spec(unsubscribe(topic(), subscriber(), timeout())
-> ok | {error, term()}).
unsubscribe(Topic, Subscriber, Timeout) ->
{Topic1, _} = emqx_topic:parse(Topic),
UnsubReq = {unsubscribe, Topic1, with_subpid(Subscriber)},
async_call(pick(Subscriber), UnsubReq, Timeout).
start_tick(0, _Msg) ->
undefined;
start_tick(Interval, Msg) when Interval > 0 ->
{ok, TRef} = timer:send_interval(Interval, Msg), TRef.
%%--------------------------------------------------------------------
%% Publish
%%--------------------------------------------------------------------
%% @doc Stop tick timer
stop_tick(undefined) ->
-spec(publish(message()) -> delivery() | stopped).
publish(Msg = #message{from = From}) ->
emqx_tracer:trace(publish, From, Msg),
case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #message{topic = Topic}} ->
publish(Topic, Msg1);
{stop, Msg1} ->
emqx_log:warning("Stop publishing: ~s", [emqx_message:format(Msg1)]),
stopped
end.
publish(Topic, Msg) ->
route(emqx_router:match_routes(Topic), delivery(Msg)).
route([], Delivery = #delivery{message = Msg}) ->
emqx_hooks:run('message.dropped', [undefined, Msg]),
dropped(Msg#message.topic), Delivery;
route([{To, Node}], Delivery) when Node =:= node() ->
dispatch(To, Delivery);
route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) ->
forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]});
route([{To, Group}], Delivery) when is_binary(Group) ->
emqx_shared_pubsub:dispatch(Group, To, Delivery);
route(Routes, Delivery) ->
lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes).
%% @doc Forward message to another node.
forward(Node, To, Delivery) ->
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} ->
emqx_log:error("[Broker] Failed to forward msg to ~s: ~s", [Node, Reason]),
Delivery;
Delivery1 -> Delivery1
end.
-spec(dispatch(topic(), delivery()) -> delivery()).
dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
case subscribers(Topic) of
[] ->
emqx_hooks:run('message.dropped', [undefined, Msg]),
dropped(Topic), Delivery;
[Sub] -> %% optimize?
dispatch(Sub, Topic, Msg),
Delivery#delivery{flows = [{dispatch, Topic, 1}|Flows]};
Subscribers ->
Count = lists:foldl(fun(Sub, Acc) ->
dispatch(Sub, Topic, Msg), Acc + 1
end, 0, Subscribers),
Delivery#delivery{flows = [{dispatch, Topic, Count}|Flows]}
end.
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
SubPid ! {dispatch, Topic, Msg};
dispatch({SubId, SubPid}, Topic, Msg) when is_binary(SubId), is_pid(SubPid) ->
SubPid ! {dispatch, Topic, Msg};
dispatch(SubId, Topic, Msg) when is_binary(SubId) ->
emqx_sm:dispatch(SubId, Topic, Msg);
dispatch({share, _Group, _Sub}, _Topic, _Msg) ->
ignore.
dropped(<<"$SYS/", _/binary>>) ->
ok;
stop_tick(TRef) ->
timer:cancel(TRef).
dropped(_Topic) ->
emqx_metrics:inc('messages/dropped').
delivery(Msg) ->
#delivery{message = Msg, flows = []}.
subscribers(Topic) ->
try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end.
subscriptions(Subscriber) ->
lists:map(fun({_, {share, _Group, Topic}}) ->
subscription(Topic, Subscriber);
({_, Topic}) ->
subscription(Topic, Subscriber)
end, ets:lookup(subscription, Subscriber)).
subscription(Topic, Subscriber) ->
{Topic, ets:lookup_element(suboption, {Topic, Subscriber}, 2)}.
-spec(subscribed(topic(), subscriber()) -> boolean()).
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
ets:member(suboption, {Topic, SubPid});
subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) ->
length(ets:match_object(suboption, {{Topic, {SubId, '_'}}, '_'}, 1)) == 1;
subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_binary(SubId), is_pid(SubPid) ->
ets:member(suboption, {Topic, {SubId, SubPid}}).
topics() -> emqx_router:topics().
getopts(Topic, Subscriber) when is_binary(Topic) ->
try ets:lookup_element(suboption, {Topic, Subscriber}, 2) catch error:badarg ->[] end.
setopts(Topic, Subscriber, Opts) when is_binary(Topic), is_list(Opts) ->
gen_server:call(pick(Subscriber), {setopts, Topic, Subscriber, Opts}).
with_subpid(SubPid) when is_pid(SubPid) ->
SubPid;
with_subpid(SubId) when is_binary(SubId) ->
{SubId, self()};
with_subpid({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
{SubId, SubPid}.
async_call(Broker, Msg, Timeout) ->
From = {self(), Tag = make_ref()},
ok = gen_server:cast(Broker, {From, Msg}),
receive
{Tag, Reply} -> Reply
after Timeout ->
{error, timeout}
end.
pick(SubPid) when is_pid(SubPid) ->
gproc_pool:pick_worker(broker, SubPid);
pick(SubId) when is_binary(SubId) ->
gproc_pool:pick_worker(broker, SubId);
pick({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
pick(SubId).
dump() ->
[{Tab, ets:tab2list(Tab)} || Tab <- [subscription, subscriber, suboption]].
%%--------------------------------------------------------------------
%% gen_server Callbacks
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
emqx_time:seed(),
ets:new(?BROKER_TAB, [set, public, named_table]),
% Tick
{ok, #state{started_at = os:timestamp(),
heartbeat = start_tick(1000, heartbeat),
version = list_to_binary(version()),
sysdescr = list_to_binary(sysdescr()),
ticker = start_tick(tick)}, hibernate}.
init([Pool, Id]) ->
gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id, subids = #{}, submon = emqx_pmon:new()}}.
handle_call(uptime, _From, State) ->
{reply, uptime(State), State};
handle_call({setopts, Topic, Subscriber, Opts}, _From, State) ->
case ets:lookup(suboption, {Topic, Subscriber}) of
[{_, OldOpts}] ->
Opts1 = lists:usort(lists:umerge(Opts, OldOpts)),
ets:insert(suboption, {{Topic, Subscriber}, Opts1}),
{reply, ok, State};
[] ->
{reply, {error, not_found}, State}
end;
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
handle_call(Request, _From, State) ->
emqx_log:error("[Broker] Unexpected request: ~p", [Request]),
{reply, ignore, State}.
handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) ->
case ets:lookup(suboption, {Topic, Subscriber}) of
[] ->
Group = proplists:get_value(share, Options),
true = do_subscribe(Group, Topic, Subscriber, Options),
emqx_shared_pubsub:subscribe(Group, Topic, subpid(Subscriber)),
emqx_router:add_route(From, Topic, dest(Options)),
{noreply, monitor_subscriber(Subscriber, State)};
[_] ->
gen_server:reply(From, ok),
{noreply, State}
end;
handle_cast({From, {unsubscribe, Topic, Subscriber}}, State) ->
case ets:lookup(suboption, {Topic, Subscriber}) of
[{_, Options}] ->
Group = proplists:get_value(share, Options),
true = do_unsubscribe(Group, Topic, Subscriber),
emqx_shared_pubsub:unsubscribe(Group, Topic, subpid(Subscriber)),
case ets:member(subscriber, Topic) of
false -> emqx_router:del_route(From, Topic, dest(Options));
true -> gen_server:reply(From, ok)
end;
[] -> gen_server:reply(From, ok)
end,
{noreply, State};
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
emqx_log:error("[Broker] Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(heartbeat, State) ->
publish(uptime, list_to_binary(uptime(State))),
publish(datetime, list_to_binary(datetime())),
{noreply, State, hibernate};
handle_info(tick, State = #state{version = Version, sysdescr = Descr}) ->
retain(brokers),
retain(version, Version),
retain(sysdescr, Descr),
{noreply, State, hibernate};
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{subids = SubIds}) ->
Subscriber = case maps:find(SubPid, SubIds) of
{ok, SubId} -> {SubId, SubPid};
error -> SubPid
end,
Topics = lists:map(fun({_, {share, _, Topic}}) ->
Topic;
({_, Topic}) ->
Topic
end, ets:lookup(subscription, Subscriber)),
lists:foreach(fun(Topic) ->
case ets:lookup(suboption, {Topic, Subscriber}) of
[{_, Options}] ->
Group = proplists:get_value(share, Options),
true = do_unsubscribe(Group, Topic, Subscriber),
case ets:member(subscriber, Topic) of
false -> emqx_router:del_route(Topic, dest(Options));
true -> ok
end;
[] -> ok
end
end, Topics),
{noreply, demonitor_subscriber(SubPid, State)};
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
emqx_log:error("[Broker] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{heartbeat = Hb, ticker = TRef}) ->
stop_tick(Hb),
stop_tick(TRef),
ok.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%% Internal Functions
%%--------------------------------------------------------------------
retain(brokers) ->
Payload = list_to_binary(string:join([atom_to_list(N) ||
N <- ekka_mnesia:running_nodes()], ",")),
Msg = emqx_message:make(broker, <<"$SYS/brokers">>, Payload),
emqx:publish(emqx_message:set_flag(sys, emqx_message:set_flag(retain, Msg))).
do_subscribe(Group, Topic, Subscriber, Options) ->
ets:insert(subscription, {Subscriber, shared(Group, Topic)}),
ets:insert(subscriber, {Topic, shared(Group, Subscriber)}),
ets:insert(suboption, {{Topic, Subscriber}, Options}).
retain(Topic, Payload) when is_binary(Payload) ->
Msg = emqx_message:make(broker, emqx_topic:systop(Topic), Payload),
emqx:publish(emqx_message:set_flag(sys, emqx_message:set_flag(retain, Msg))).
do_unsubscribe(Group, Topic, Subscriber) ->
ets:delete_object(subscription, {Subscriber, shared(Group, Topic)}),
ets:delete_object(subscriber, {Topic, shared(Group, Subscriber)}),
ets:delete(suboption, {Topic, Subscriber}).
publish(Topic, Payload) when is_binary(Payload) ->
Msg = emqx_message:make(broker, emqx_topic:systop(Topic), Payload),
emqx:publish(emqx_message:set_flag(sys, Msg)).
monitor_subscriber(SubPid, State = #state{submon = SubMon}) when is_pid(SubPid) ->
State#state{submon = SubMon:monitor(SubPid)};
uptime(#state{started_at = Ts}) ->
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
lists:flatten(uptime(seconds, Secs)).
monitor_subscriber({SubId, SubPid}, State = #state{subids = SubIds, submon = SubMon}) ->
State#state{subids = maps:put(SubPid, SubId, SubIds), submon = SubMon:monitor(SubPid)}.
uptime(seconds, Secs) when Secs < 60 ->
[integer_to_list(Secs), " seconds"];
uptime(seconds, Secs) ->
[uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"];
uptime(minutes, M) when M < 60 ->
[integer_to_list(M), " minutes, "];
uptime(minutes, M) ->
[uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "];
uptime(hours, H) when H < 24 ->
[integer_to_list(H), " hours, "];
uptime(hours, H) ->
[uptime(days, H div 24), integer_to_list(H rem 24), " hours, "];
uptime(days, D) ->
[integer_to_list(D), " days,"].
demonitor_subscriber(SubPid, State = #state{subids = SubIds, submon = SubMon}) ->
State#state{subids = maps:remove(SubPid, SubIds), submon = SubMon:demonitor(SubPid)}.
dest(Options) ->
case proplists:get_value(share, Options) of
undefined -> node();
Group -> {Group, node()}
end.
subpid(SubPid) when is_pid(SubPid) ->
SubPid;
subpid({_SubId, SubPid}) when is_pid(SubPid) ->
SubPid.
shared(undefined, Name) -> Name;
shared(Group, Name) -> {share, Group, Name}.

View File

@ -0,0 +1,66 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_broker_helper).
-behaviour(gen_server).
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {stats_fun, stats_timer}).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}).
start_link(StatsFun) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [StatsFun], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([StatsFun]) ->
{ok, TRef} = timer:send_interval(1000, stats),
{ok, #state{stats_fun = StatsFun, stats_timer = TRef}}.
handle_call(Req, _From, State) ->
emqx_log:error("[BrokerHelper] Unexpected request: ~p", [Req]),
{reply, ignore, State}.
handle_cast(Msg, State) ->
emqx_log:error("[BrokerHelper] Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
StatsFun(), {noreply, State, hibernate};
handle_info(Info, State) ->
emqx_log:error("[BrokerHelper] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{stats_timer = TRef}) ->
timer:cancel(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,70 +14,80 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_pubsub_sup).
-module(emqx_broker_sup).
-behaviour(supervisor).
%% API
-export([start_link/0, pubsub_pool/0]).
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
pubsub_pool() ->
hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
%%--------------------------------------------------------------------
%% Supervisor Callbacks
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([]) ->
{ok, Env} = emqx:env(pubsub),
%% Create ETS Tables
[create_tab(Tab) || Tab <- [mqtt_subproperty, mqtt_subscriber, mqtt_subscription]],
{ok, { {one_for_all, 10, 3600}, [pool_sup(pubsub, Env), pool_sup(server, Env)]} }.
%% Create the pubsub tables
create_tabs(),
%% Shared pubsub
Shared = {shared_pubsub, {emqx_shared_pubsub, start_link, []},
permanent, 5000, worker, [emqx_shared_pubsub]},
%% Broker helper
Helper = {broker_helper, {emqx_broker_helper, start_link, [stats_fun()]},
permanent, 5000, worker, [emqx_broker_helper]},
%% Broker pool
PoolArgs = [broker, hash, emqx_sys:schedulers() * 2,
{emqx_broker, start_link, []}],
PoolSup = emqx_pool_sup:spec(broker_pool, PoolArgs),
{ok, {{one_for_all, 0, 3600}, [Shared, Helper, PoolSup]}}.
%%--------------------------------------------------------------------
%% Pool
%% Create tables
%%--------------------------------------------------------------------
pool_size(Env) ->
Schedulers = erlang:system_info(schedulers),
proplists:get_value(pool_size, Env, Schedulers).
create_tabs() ->
lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]).
pool_sup(Name, Env) ->
Pool = list_to_atom(atom_to_list(Name) ++ "_pool"),
Mod = list_to_atom("emqx_" ++ atom_to_list(Name)),
MFA = {Mod, start_link, [Env]},
emqx_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]).
create_tab(suboption) ->
%% Suboption: {Topic, Sub} -> [{qos, 1}]
ensure_tab(suboption, [set | ?CONCURRENCY_OPTS]);
%%--------------------------------------------------------------------
%% Create PubSub Tables
%%--------------------------------------------------------------------
create_tab(mqtt_subproperty) ->
%% Subproperty: {Topic, Sub} -> [{qos, 1}]
ensure_tab(mqtt_subproperty, [public, named_table, set | ?CONCURRENCY_OPTS]);
create_tab(mqtt_subscriber) ->
create_tab(subscriber) ->
%% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN
%% duplicate_bag: o(1) insert
ensure_tab(mqtt_subscriber, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]);
ensure_tab(subscriber, [duplicate_bag | ?CONCURRENCY_OPTS]);
create_tab(mqtt_subscription) ->
create_tab(subscription) ->
%% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN
%% bag: o(n) insert
ensure_tab(mqtt_subscription, [public, named_table, bag | ?CONCURRENCY_OPTS]).
ensure_tab(subscription, [bag | ?CONCURRENCY_OPTS]).
ensure_tab(Tab, Opts) ->
case ets:info(Tab, name) of undefined -> ets:new(Tab, Opts); _ -> ok end.
case ets:info(Tab, name) of
undefined ->
ets:new(Tab, lists:usort([public, named_table | Opts]));
Tab -> Tab
end.
%%--------------------------------------------------------------------
%% Stats function
%%--------------------------------------------------------------------
stats_fun() ->
fun() ->
emqx_stats:setstat('subscribers/count', 'subscribers/max',
ets:info(subscriber, size)),
emqx_stats:setstat('subscriptions/count', 'subscriptions/max',
ets:info(subscription, size))
end.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conn).
-module(emqx_connection).
-behaviour(gen_server).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

51
src/emqx_log.erl Normal file
View File

@ -0,0 +1,51 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_log).
-compile({no_auto_import,[error/1]}).
-export([debug/1, debug/2,
info/1, info/2,
warning/1, warning/2,
error/1, error/2,
critical/1, critical/2]).
debug(Msg) ->
lager:debug(Msg).
debug(Format, Args) ->
lager:debug(Format, Args).
info(Msg) ->
lager:info(Msg).
info(Format, Args) ->
lager:info(Format, Args).
warning(Msg) ->
lager:warning(Msg).
warning(Format, Args) ->
lager:warning(Format, Args).
error(Msg) ->
lager:error(Msg).
error(Format, Args) ->
lager:error(Format, Args).
critical(Msg) ->
lager:critical(Msg).
critical(Format, Args) ->
lager:critical(Format, Args).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -436,7 +436,7 @@ maybe_set_clientid(State) ->
send_willmsg(_Client, undefined) ->
ignore;
send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) ->
emqx_server:publish(WillMsg#mqtt_message{from = {ClientId, Username}}).
emqx_broker:publish(WillMsg#mqtt_message{from = {ClientId, Username}}).
start_keepalive(0, _State) -> ignore;

View File

@ -1,254 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_pubsub).
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-export([start_link/3]).
%% PubSub API.
-export([subscribe/3, async_subscribe/3, publish/2, unsubscribe/3,
async_unsubscribe/3, subscribers/1]).
-export([dispatch/2]).
%% gen_server Callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {pool, id, env}).
-define(PUBSUB, ?MODULE).
-define(is_local(Options), lists:member(local, Options)).
%%--------------------------------------------------------------------
%% Start PubSub
%%--------------------------------------------------------------------
-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, Env) ->
gen_server:start_link(?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]).
%%--------------------------------------------------------------------
%% PubSub API
%%--------------------------------------------------------------------
%% @doc Subscribe to a Topic
-spec(subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok).
subscribe(Topic, Subscriber, Options) ->
call(pick(Topic), {subscribe, Topic, Subscriber, Options}).
-spec(async_subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok).
async_subscribe(Topic, Subscriber, Options) ->
cast(pick(Topic), {subscribe, Topic, Subscriber, Options}).
%% @doc Publish MQTT Message to a Topic
-spec(publish(binary(), mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
publish(Topic, Msg) ->
route(lists:append(emqx_router:match(Topic),
emqx_router:match_local(Topic)), delivery(Msg)).
route([], #mqtt_delivery{message = Msg}) ->
emqx_hooks:run('message.dropped', [undefined, Msg]),
dropped(Msg#mqtt_message.topic), ignore;
%% Dispatch on the local node.
route([#route{topic = To, node = Node}],
Delivery = #mqtt_delivery{flows = Flows}) when Node =:= node() ->
dispatch(To, Delivery#mqtt_delivery{flows = [{route, Node, To} | Flows]});
%% Forward to other nodes
route([#route{topic = To, node = Node}], Delivery = #mqtt_delivery{flows = Flows}) ->
forward(Node, To, Delivery#mqtt_delivery{flows = [{route, Node, To}|Flows]});
route(Routes, Delivery) ->
{ok, lists:foldl(fun(Route, Acc) ->
{ok, Acc1} = route([Route], Acc), Acc1
end, Delivery, Routes)}.
delivery(Msg) -> #mqtt_delivery{sender = self(), message = Msg, flows = []}.
%% @doc Forward message to another node...
forward(Node, To, Delivery) ->
emqx_rpc:cast(Node, ?PUBSUB, dispatch, [To, Delivery]), {ok, Delivery}.
%% @doc Dispatch Message to Subscribers.
-spec(dispatch(binary(), mqtt_delivery()) -> mqtt_delivery()).
dispatch(Topic, Delivery = #mqtt_delivery{message = Msg, flows = Flows}) ->
case subscribers(Topic) of
[] ->
emqx_hooks:run('message.dropped', [undefined, Msg]),
dropped(Topic), {ok, Delivery};
[Sub] -> %% optimize?
dispatch(Sub, Topic, Msg),
{ok, Delivery#mqtt_delivery{flows = [{dispatch, Topic, 1}|Flows]}};
Subscribers ->
Flows1 = [{dispatch, Topic, length(Subscribers)} | Flows],
lists:foreach(fun(Sub) -> dispatch(Sub, Topic, Msg) end, Subscribers),
{ok, Delivery#mqtt_delivery{flows = Flows1}}
end.
%%TODO: Is SubPid aliving???
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
SubPid ! {dispatch, Topic, Msg};
dispatch({SubId, SubPid}, Topic, Msg) when is_binary(SubId), is_pid(SubPid) ->
SubPid ! {dispatch, Topic, Msg};
dispatch({{share, _Share}, [Sub]}, Topic, Msg) ->
dispatch(Sub, Topic, Msg);
dispatch({{share, _Share}, []}, _Topic, _Msg) ->
ok;
dispatch({{share, _Share}, Subs}, Topic, Msg) -> %% round-robbin?
dispatch(lists:nth(rand:uniform(length(Subs)), Subs), Topic, Msg).
subscribers(Topic) ->
group_by_share(try ets:lookup_element(mqtt_subscriber, Topic, 2) catch error:badarg -> [] end).
group_by_share([]) -> [];
group_by_share(Subscribers) ->
{Subs1, Shares1} =
lists:foldl(fun({share, Share, Sub}, {Subs, Shares}) ->
{Subs, dict:append({share, Share}, Sub, Shares)};
(Sub, {Subs, Shares}) ->
{[Sub|Subs], Shares}
end, {[], dict:new()}, Subscribers),
lists:append(Subs1, dict:to_list(Shares1)).
%% @private
%% @doc Ingore $SYS Messages.
dropped(<<"$SYS/", _/binary>>) ->
ok;
dropped(_Topic) ->
emqx_metrics:inc('messages/dropped').
%% @doc Unsubscribe
-spec(unsubscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok).
unsubscribe(Topic, Subscriber, Options) ->
call(pick(Topic), {unsubscribe, Topic, Subscriber, Options}).
-spec(async_unsubscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok).
async_unsubscribe(Topic, Subscriber, Options) ->
cast(pick(Topic), {unsubscribe, Topic, Subscriber, Options}).
call(PubSub, Req) when is_pid(PubSub) ->
gen_server:call(PubSub, Req, infinity).
cast(PubSub, Msg) when is_pid(PubSub) ->
gen_server:cast(PubSub, Msg).
pick(Topic) ->
gproc_pool:pick_worker(pubsub, Topic).
%%--------------------------------------------------------------------
%% gen_server Callbacks
%%--------------------------------------------------------------------
init([Pool, Id, Env]) ->
gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id, env = Env}, hibernate}.
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
add_subscriber(Topic, Subscriber, Options),
reply(ok, setstats(State));
handle_call({unsubscribe, Topic, Subscriber, Options}, _From, State) ->
del_subscriber(Topic, Subscriber, Options),
reply(ok, setstats(State));
handle_call(Req, _From, State) ->
lager:error("[~s] Unexpected Call: ~p", [?MODULE, Req]),
{reply, ignore, State}.
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
add_subscriber(Topic, Subscriber, Options),
noreply(setstats(State));
handle_cast({unsubscribe, Topic, Subscriber, Options}, State) ->
del_subscriber(Topic, Subscriber, Options),
noreply(setstats(State));
handle_cast(Msg, State) ->
lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]),
{noreply, State}.
handle_info(Info, State) ->
lager:error("[~s] Unexpected Info: ~p", [?MODULE, Info]),
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internel Functions
%%--------------------------------------------------------------------
add_subscriber(Topic, Subscriber, Options) ->
Share = proplists:get_value(share, Options),
case ?is_local(Options) of
false -> add_global_subscriber(Share, Topic, Subscriber);
true -> add_local_subscriber(Share, Topic, Subscriber)
end.
add_global_subscriber(Share, Topic, Subscriber) ->
case ets:member(mqtt_subscriber, Topic) and emqx_router:has_route(Topic) of
true -> ok;
false -> emqx_router:add_route(Topic)
end,
ets:insert(mqtt_subscriber, {Topic, shared(Share, Subscriber)}).
add_local_subscriber(Share, Topic, Subscriber) ->
(not ets:member(mqtt_subscriber, {local, Topic})) andalso emqx_router:add_local_route(Topic),
ets:insert(mqtt_subscriber, {{local, Topic}, shared(Share, Subscriber)}).
del_subscriber(Topic, Subscriber, Options) ->
Share = proplists:get_value(share, Options),
case ?is_local(Options) of
false -> del_global_subscriber(Share, Topic, Subscriber);
true -> del_local_subscriber(Share, Topic, Subscriber)
end.
del_global_subscriber(Share, Topic, Subscriber) ->
ets:delete_object(mqtt_subscriber, {Topic, shared(Share, Subscriber)}),
(not ets:member(mqtt_subscriber, Topic)) andalso emqx_router:del_route(Topic).
del_local_subscriber(Share, Topic, Subscriber) ->
ets:delete_object(mqtt_subscriber, {{local, Topic}, shared(Share, Subscriber)}),
(not ets:member(mqtt_subscriber, {local, Topic})) andalso emqx_router:del_local_route(Topic).
shared(undefined, Subscriber) ->
Subscriber;
shared(Share, Subscriber) ->
{share, Share, Subscriber}.
setstats(State) ->
emqx_stats:setstats('subscribers/count', 'subscribers/max', ets:info(mqtt_subscriber, size)),
State.
reply(Reply, State) ->
{reply, Reply, State}.
noreply(State) ->
{noreply, State}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -26,35 +26,27 @@
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([start_link/1]).
%% For eunit tests
-export([start/0, stop/0]).
%% Start
-export([start_link/2]).
%% Topics
-export([topics/0, local_topics/0]).
-export([topics/0]).
%% Route APIs
-export([add_route/1, get_routes/1, del_route/1, has_route/1]).
%% Route Management APIs
-export([add_route/2, add_route/3, get_routes/1, del_route/2, del_route/3]).
%% Match and print
-export([match/1, print/1]).
%% Match, print routes
-export([has_routes/1, match_routes/1, print_routes/1]).
%% Local Route API
-export([get_local_routes/0, add_local_route/1, match_local/1,
del_local_route/1, clean_local_routes/0]).
-export([dump/0]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([dump/0]).
-record(state, {pool, id}).
-record(state, {stats_fun, stats_timer}).
-define(ROUTER, ?MODULE).
-define(LOCK, {?ROUTER, clean_routes}).
-type(destination() :: node() | {binary(), node()}).
%%--------------------------------------------------------------------
%% Mnesia Bootstrap
@ -68,14 +60,46 @@ mnesia(boot) ->
{attributes, record_info(fields, route)}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(route, ram_copies).
ok = ekka_mnesia:copy_table(route).
%%--------------------------------------------------------------------
%% Start the Router
%% Start a router
%%--------------------------------------------------------------------
start_link(StatsFun) ->
gen_server:start_link({local, ?ROUTER}, ?MODULE, [StatsFun], []).
start_link(Pool, Id) ->
gen_server:start_link(?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
%%--------------------------------------------------------------------
%% Add/Del Routes
%%--------------------------------------------------------------------
%% @doc Add a route
-spec(add_route(topic(), destination()) -> ok).
add_route(Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {add_route, #route{topic = Topic, dest = Dest}}).
-spec(add_route({pid(), reference()}, topic(), destination()) -> ok).
add_route(From, Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {add_route, From, #route{topic = Topic, dest = Dest}}).
%% @doc Get routes
-spec(get_routes(topic()) -> [route()]).
get_routes(Topic) ->
ets:lookup(route, Topic).
%% @doc Delete a route
-spec(del_route(topic(), destination()) -> ok).
del_route(Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {del_route, #route{topic = Topic, dest = Dest}}).
-spec(del_route({pid(), reference()}, topic(), destination()) -> ok).
del_route(From, Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {del_route, From, #route{topic = Topic, dest = Dest}}).
%% @doc Has routes?
-spec(has_routes(topic()) -> boolean()).
has_routes(Topic) when is_binary(Topic) ->
ets:member(route, Topic).
%%--------------------------------------------------------------------
%% Topics
@ -85,44 +109,116 @@ start_link(StatsFun) ->
topics() ->
mnesia:dirty_all_keys(route).
-spec(local_topics() -> list(binary())).
local_topics() ->
ets:select(local_route, [{{'$1', '_'}, [], ['$1']}]).
%%--------------------------------------------------------------------
%% Match API
%% Match Routes
%%--------------------------------------------------------------------
%% @doc Match Routes.
-spec(match(Topic:: binary()) -> [route()]).
match(Topic) when is_binary(Topic) ->
%% @doc Match routes
-spec(match_routes(Topic:: topic()) -> [{topic(), binary() | node()}]).
match_routes(Topic) when is_binary(Topic) ->
%% Optimize: ets???
Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]),
%% Optimize: route table will be replicated to all nodes.
lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]).
aggre(lists:append([ets:lookup(route, To) || To <- [Topic | Matched]])).
%% @doc Print Routes.
-spec(print(Topic :: binary()) -> [ok]).
print(Topic) ->
[io:format("~s -> ~s~n", [To, Node]) ||
#route{topic = To, node = Node} <- match(Topic)].
%% Aggregate routes
aggre([]) ->
[];
aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
[{To, Node}];
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
[{To, Group}];
aggre(Routes) ->
lists:foldl(
fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) ->
[{To, Node} | Acc];
(#route{topic = To, dest = {Group, _}}, Acc) ->
lists:usort([{To, Group} | Acc])
end, [], Routes).
%%--------------------------------------------------------------------
%% Route Management API
%% Print Routes
%%--------------------------------------------------------------------
%% @doc Add Route.
-spec(add_route(binary() | route()) -> ok | {error, Reason :: term()}).
add_route(Topic) when is_binary(Topic) ->
add_route(#route{topic = Topic, node = node()});
add_route(Route = #route{topic = Topic}) ->
case emqx_topic:wildcard(Topic) of
true -> case mnesia:is_transaction() of
true -> add_trie_route(Route);
false -> trans(fun add_trie_route/1, [Route])
end;
false -> add_direct_route(Route)
end.
%% @doc Print routes to a topic
-spec(print_routes(topic()) -> ok).
print_routes(Topic) ->
lists:foreach(fun({To, Dest}) ->
io:format("~s -> ~s~n", [To, Dest])
end, match_routes(Topic)).
cast(Router, Msg) ->
gen_server:cast(Router, Msg).
pick(Topic) ->
gproc_pool:pick_worker(router, Topic).
%%FIXME: OOM?
dump() ->
[{route, [{To, Dest} || #route{topic = To, dest = Dest} <- ets:tab2list(route)]}].
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Pool, Id]) ->
gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id}}.
handle_call(Req, _From, State) ->
emqx_log:error("[Router] Unexpected request: ~p", [Req]),
{reply, ignore, State}.
handle_cast({add_route, From, Route}, State) ->
_ = handle_cast({add_route, Route}, State),
gen_server:reply(From, ok),
{noreply, State};
handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) ->
case lists:member(Route, ets:lookup(route, Topic)) of
true -> ok;
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true -> trans(fun add_trie_route/1, [Route]);
false -> add_direct_route(Route)
end
end,
{noreply, State};
handle_cast({del_route, From, Route}, State) ->
_ = handle_cast({del_route, Route}, State),
gen_server:reply(From, ok),
{noreply, State};
handle_cast({del_route, Route = #route{topic = Topic}}, State) ->
%% Confirm if there are still subscribers...
case ets:member(subscriber, Topic) of
true -> ok;
false ->
case emqx_topic:wildcard(Topic) of
true -> trans(fun del_trie_route/1, [Route]);
false -> del_direct_route(Route)
end
end,
{noreply, State};
handle_cast(Msg, State) ->
emqx_log:error("[Router] Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
add_direct_route(Route) ->
mnesia:async_dirty(fun mnesia:write/1, [Route]).
@ -134,24 +230,6 @@ add_trie_route(Route = #route{topic = Topic}) ->
end,
mnesia:write(Route).
%% @doc Lookup Routes
-spec(get_routes(binary()) -> [route()]).
get_routes(Topic) ->
ets:lookup(route, Topic).
%% @doc Delete Route
-spec(del_route(binary() | route()) -> ok | {error, Reason :: term()}).
del_route(Topic) when is_binary(Topic) ->
del_route(#route{topic = Topic, node = node()});
del_route(Route = #route{topic = Topic}) ->
case emqx_topic:wildcard(Topic) of
true -> case mnesia:is_transaction() of
true -> del_trie_route(Route);
false -> trans(fun del_trie_route/1, [Route])
end;
false -> del_direct_route(Route)
end.
del_direct_route(Route) ->
mnesia:async_dirty(fun mnesia:delete_object/1, [Route]).
@ -165,126 +243,13 @@ del_trie_route(Route = #route{topic = Topic}) ->
[] -> ok
end.
%% @doc Has route?
-spec(has_route(binary()) -> boolean()).
has_route(Topic) when is_binary(Topic) ->
ets:member(route, Topic).
%% @private
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of
{atomic, _} -> ok;
{aborted, Error} -> {error, Error}
{aborted, Error} ->
emqx_log:error("[Router] Mnesia aborted: ~p", [Error]),
{error, Error}
end.
%%--------------------------------------------------------------------
%% Local Route API
%%--------------------------------------------------------------------
-spec(get_local_routes() -> list({binary(), node()})).
get_local_routes() ->
ets:tab2list(local_route).
-spec(add_local_route(binary()) -> ok).
add_local_route(Topic) ->
gen_server:call(?ROUTER, {add_local_route, Topic}).
-spec(del_local_route(binary()) -> ok).
del_local_route(Topic) ->
gen_server:call(?ROUTER, {del_local_route, Topic}).
-spec(match_local(binary()) -> [route()]).
match_local(Name) ->
case ets:info(local_route, size) of
0 -> [];
_ -> ets:foldl(
fun({Filter, Node}, Matched) ->
case emqx_topic:match(Name, Filter) of
true -> [#route{topic = {local, Filter}, node = Node} | Matched];
false -> Matched
end
end, [], local_route)
end.
-spec(clean_local_routes() -> ok).
clean_local_routes() ->
gen_server:call(?ROUTER, clean_local_routes).
dump() ->
[{route, ets:tab2list(route)}, {local_route, ets:tab2list(local_route)}].
%% For unit test.
start() ->
gen_server:start({local, ?ROUTER}, ?MODULE, [], []).
stop() ->
gen_server:call(?ROUTER, stop).
%%--------------------------------------------------------------------
%% gen_server Callbacks
%%--------------------------------------------------------------------
init([StatsFun]) ->
ekka:monitor(membership),
ets:new(local_route, [set, named_table, protected]),
{ok, TRef} = timer:send_interval(timer:seconds(1), stats),
{ok, #state{stats_fun = StatsFun, stats_timer = TRef}}.
handle_call({add_local_route, Topic}, _From, State) ->
%% why node()...?
ets:insert(local_route, {Topic, node()}),
{reply, ok, State};
handle_call({del_local_route, Topic}, _From, State) ->
ets:delete(local_route, Topic),
{reply, ok, State};
handle_call(clean_local_routes, _From, State) ->
ets:delete_all_objects(local_route),
{reply, ok, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(_Req, _From, State) ->
{reply, ignore, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State) ->
global:trans({?LOCK, self()}, fun() -> clean_routes_(Node) end),
handle_info(stats, State);
handle_info({membership, _Event}, State) ->
%% ignore
{noreply, State};
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
StatsFun(mnesia:table_info(route, size)),
{noreply, State, hibernate};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{stats_timer = TRef}) ->
timer:cancel(TRef),
ekka:unmonitor(membership).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
%% Clean routes on the down node.
clean_routes_(Node) ->
Pattern = #route{_ = '_', node = Node},
Clean = fun() ->
[mnesia:delete_object(route, R, write) ||
R <- mnesia:match_object(route, Pattern, write)]
end,
mnesia:transaction(Clean).

158
src/emqx_router_helper.erl Normal file
View File

@ -0,0 +1,158 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_router_helper).
-behaviour(gen_server).
-include("emqx.hrl").
%% Mnesia Bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% API
-export([start_link/1, monitor/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(routing_node, {name, ts}).
-record(state, {nodes = [], stats_fun, stats_timer}).
-compile({no_auto_import, [monitor/1]}).
-define(SERVER, ?MODULE).
-define(TABLE, routing_node).
-define(LOCK, {?MODULE, clean_routes}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = ekka_mnesia:create_table(?TABLE, [
{type, set},
{ram_copies, [node()]},
{record_name, routing_node},
{attributes, record_info(fields, routing_node)}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TABLE).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% @doc Starts the router helper
-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}).
start_link(StatsFun) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [StatsFun], []).
%% @doc Monitor routing node
-spec(monitor(node()) -> ok).
monitor({_Group, Node}) ->
monitor(Node);
monitor(Node) when is_atom(Node) ->
case ekka:is_member(Node) orelse ets:member(?TABLE, Node) of
true -> ok;
false -> mnesia:dirty_write(#routing_node{name = Node, ts = os:timestamp()})
end.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([StatsFun]) ->
ekka:monitor(membership),
mnesia:subscribe({table, ?TABLE, simple}),
Nodes = lists:foldl(
fun(Node, Acc) ->
case ekka:is_member(Node) of
true -> Acc;
false -> _ = erlang:monitor_node(Node, true),
[Node | Acc]
end
end, [], mnesia:dirty_all_keys(?TABLE)),
{ok, TRef} = timer:send_interval(timer:seconds(1), stats),
{ok, #state{nodes = Nodes, stats_fun = StatsFun, stats_timer = TRef}}.
handle_call(Req, _From, State) ->
emqx_log:error("[RouterHelper] Unexpected request: ~p", [Req]),
{reply, ignore, State}.
handle_cast(Msg, State) ->
emqx_log:error("[RouterHelper] Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info({mnesia_table_event, {write, #routing_node{name = Node}, _}},
State = #state{nodes = Nodes}) ->
emqx_log:info("[RouterHelper] New routing node: ~s", [Node]),
case ekka:is_member(Node) orelse lists:member(Node, Nodes) of
true -> {noreply, State};
false -> _ = erlang:monitor_node(Node, true),
{noreply, State#state{nodes = [Node | Nodes]}}
end;
handle_info({mnesia_table_event, _Event}, State) ->
{noreply, State};
handle_info({nodedown, Node}, State = #state{nodes = Nodes}) ->
global:trans({?LOCK, self()},
fun() ->
mnesia:transaction(fun clean_routes/1, [Node])
end),
mnesia:dirty_delete(routing_node, Node),
handle_info(stats, State#state{nodes = lists:delete(Node, Nodes)});
handle_info({membership, {mnesia, down, Node}}, State) ->
handle_info({nodedown, Node}, State);
handle_info({membership, _Event}, State) ->
{noreply, State};
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
ok = StatsFun(mnesia:table_info(route, size)),
{noreply, State, hibernate};
handle_info(Info, State) ->
emqx_log:error("[RouteHelper] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{stats_timer = TRef}) ->
timer:cancel(TRef),
ekka:unmonitor(membership),
mnesia:unsubscribe({table, ?TABLE, simple}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
clean_routes(Node) ->
Patterns = [#route{_ = '_', dest = Node},
#route{_ = '_', dest = {'_', Node}}],
[mnesia:delete_object(R) || P <- Patterns,
R <- mnesia:match_object(P)].

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -27,12 +27,15 @@ start_link() ->
init([]) ->
StatsFun = emqx_stats:statsfun('routes/count', 'routes/max'),
SupFlags = #{strategy => one_for_all, intensity => 1, period => 5},
Router = #{id => emqx_router,
start => {emqx_router, start_link, [StatsFun]},
restart => permanent,
shutdown => 30000,
type => worker,
modules => [emqx_router]},
{ok, {SupFlags, [Router]}}.
%% Router helper
Helper = {router_helper, {emqx_router_helper, start_link, [StatsFun]},
permanent, 5000, worker, [emqx_router_helper]},
%% Router pool
PoolSup = emqx_pool_sup:spec(router_pool,
[router, hash, emqx_sys:schedulers(),
{emqx_router, start_link, []}]),
{ok, {{one_for_all, 0, 3600}, [Helper, PoolSup]}}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,326 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_server).
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_internal.hrl").
-export([start_link/3]).
%% PubSub API.
-export([subscribe/1, subscribe/2, subscribe/3, publish/1,
unsubscribe/1, unsubscribe/2]).
%% Async PubSub API.
-export([async_subscribe/1, async_subscribe/2, async_subscribe/3,
async_unsubscribe/1, async_unsubscribe/2]).
%% Management API.
-export([setqos/3, subscriptions/1, subscribers/1, subscribed/2]).
%% Debug API
-export([dump/0]).
%% gen_server Callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {pool, id, env, subids :: map(), submon :: emqx_pmon:pmon()}).
%% @doc Start the server
-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, Env) ->
gen_server:start_link(?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]).
%%--------------------------------------------------------------------
%% PubSub API
%%--------------------------------------------------------------------
%% @doc Subscribe to a Topic.
-spec(subscribe(binary()) -> ok | {error, term()}).
subscribe(Topic) when is_binary(Topic) ->
subscribe(Topic, self()).
-spec(subscribe(binary(), emqx:subscriber()) -> ok | {error, term()}).
subscribe(Topic, Subscriber) when is_binary(Topic) ->
subscribe(Topic, Subscriber, []).
-spec(subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) ->
ok | {error, term()}).
subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
call(pick(Subscriber), {subscribe, Topic, with_subpid(Subscriber), Options}).
%% @doc Subscribe to a Topic asynchronously.
-spec(async_subscribe(binary()) -> ok).
async_subscribe(Topic) when is_binary(Topic) ->
async_subscribe(Topic, self()).
-spec(async_subscribe(binary(), emqx:subscriber()) -> ok).
async_subscribe(Topic, Subscriber) when is_binary(Topic) ->
async_subscribe(Topic, Subscriber, []).
-spec(async_subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok).
async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
cast(pick(Subscriber), {subscribe, Topic, with_subpid(Subscriber), Options}).
%% @doc Publish a message
-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
publish(Msg = #mqtt_message{from = From}) ->
trace(publish, From, Msg),
case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #mqtt_message{topic = Topic}} ->
emqx_pubsub:publish(Topic, Msg1);
{stop, Msg1} ->
lager:warning("Stop publishing: ~s", [emqx_message:format(Msg1)]),
ignore
end.
%% @private
trace(publish, From, _Msg) when is_atom(From) ->
%% Dont' trace '$SYS' publish
ignore;
trace(publish, {ClientId, Username}, #mqtt_message{topic = Topic, payload = Payload}) ->
lager:info([{client, ClientId}, {topic, Topic}],
"~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]);
trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) when is_binary(From); is_list(From) ->
lager:info([{client, From}, {topic, Topic}],
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
%% @doc Unsubscribe
-spec(unsubscribe(binary()) -> ok | {error, term()}).
unsubscribe(Topic) when is_binary(Topic) ->
unsubscribe(Topic, self()).
%% @doc Unsubscribe
-spec(unsubscribe(binary(), emqx:subscriber()) -> ok | {error, term()}).
unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
call(pick(Subscriber), {unsubscribe, Topic, with_subpid(Subscriber)}).
%% @doc Async Unsubscribe
-spec(async_unsubscribe(binary()) -> ok).
async_unsubscribe(Topic) when is_binary(Topic) ->
async_unsubscribe(Topic, self()).
-spec(async_unsubscribe(binary(), emqx:subscriber()) -> ok).
async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
cast(pick(Subscriber), {unsubscribe, Topic, with_subpid(Subscriber)}).
-spec(setqos(binary(), emqx:subscriber(), mqtt_qos()) -> ok).
setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
call(pick(Subscriber), {setqos, Topic, with_subpid(Subscriber), Qos}).
with_subpid(SubPid) when is_pid(SubPid) ->
SubPid;
with_subpid(SubId) when is_binary(SubId) ->
{SubId, self()};
with_subpid({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
{SubId, SubPid}.
-spec(subscriptions(emqx:subscriber()) -> [{emqx:subscriber(), binary(), list(emqx:suboption())}]).
subscriptions(SubPid) when is_pid(SubPid) ->
with_subproperty(ets:lookup(mqtt_subscription, SubPid));
subscriptions(SubId) when is_binary(SubId) ->
with_subproperty(ets:match_object(mqtt_subscription, {{SubId, '_'}, '_'}));
subscriptions({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
with_subproperty(ets:lookup(mqtt_subscription, {SubId, SubPid})).
with_subproperty({Subscriber, {share, _Share, Topic}}) ->
with_subproperty({Subscriber, Topic});
with_subproperty({Subscriber, Topic}) ->
{Subscriber, Topic, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)};
with_subproperty(Subscriptions) when is_list(Subscriptions) ->
[with_subproperty(Subscription) || Subscription <- Subscriptions].
-spec(subscribers(binary()) -> list(emqx:subscriber())).
subscribers(Topic) when is_binary(Topic) ->
emqx_pubsub:subscribers(Topic).
-spec(subscribed(binary(), emqx:subscriber()) -> boolean()).
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
ets:member(mqtt_subproperty, {Topic, SubPid});
subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) ->
length(ets:match_object(mqtt_subproperty, {{Topic, {SubId, '_'}}, '_'}, 1)) == 1;
subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_binary(SubId), is_pid(SubPid) ->
ets:member(mqtt_subproperty, {Topic, {SubId, SubPid}}).
call(Server, Req) ->
gen_server:call(Server, Req, infinity).
cast(Server, Msg) when is_pid(Server) ->
gen_server:cast(Server, Msg).
pick(SubPid) when is_pid(SubPid) ->
gproc_pool:pick_worker(server, SubPid);
pick(SubId) when is_binary(SubId) ->
gproc_pool:pick_worker(server, SubId);
pick({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
pick(SubId).
dump() ->
[{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_subproperty, mqtt_subscription, mqtt_subscriber]].
%%--------------------------------------------------------------------
%% gen_server Callbacks
%%--------------------------------------------------------------------
init([Pool, Id, Env]) ->
gproc_pool:connect_worker(Pool, {Pool, Id}),
State = #state{pool = Pool, id = Id, env = Env,
subids = #{}, submon = emqx_pmon:new()},
{ok, State, hibernate}.
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
case do_subscribe(Topic, Subscriber, Options, State) of
{ok, NewState} -> reply(ok, setstats(NewState));
{error, Error} -> reply({error, Error}, State)
end;
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
case do_unsubscribe(Topic, Subscriber, State) of
{ok, NewState} -> reply(ok, setstats(NewState));
{error, Error} -> reply({error, Error}, State)
end;
handle_call({setqos, Topic, Subscriber, Qos}, _From, State) ->
Key = {Topic, Subscriber},
case ets:lookup(mqtt_subproperty, Key) of
[{_, Opts}] ->
Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts),
ets:insert(mqtt_subproperty, {Key, Opts1}),
reply(ok, State);
[] ->
reply({error, {subscription_not_found, Topic}}, State)
end;
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
case do_subscribe(Topic, Subscriber, Options, State) of
{ok, NewState} -> noreply(setstats(NewState));
{error, _Error} -> noreply(State)
end;
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
case do_unsubscribe(Topic, Subscriber, State) of
{ok, NewState} -> noreply(setstats(NewState));
{error, _Error} -> noreply(State)
end;
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{subids = SubIds}) ->
case maps:find(DownPid, SubIds) of
{ok, SubId} ->
clean_subscriber({SubId, DownPid});
error ->
clean_subscriber(DownPid)
end,
noreply(setstats(demonitor_subscriber(DownPid, State)));
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
terminate(_Reason, #state{pool = Pool, id = Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
do_subscribe(Topic, Subscriber, Options, State) ->
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
[] ->
emqx_pubsub:async_subscribe(Topic, Subscriber, Options),
Share = proplists:get_value(share, Options),
add_subscription(Share, Subscriber, Topic),
ets:insert(mqtt_subproperty, {{Topic, Subscriber}, Options}),
{ok, monitor_subscriber(Subscriber, State)};
[_] ->
{error, {already_subscribed, Topic}}
end.
add_subscription(undefined, Subscriber, Topic) ->
ets:insert(mqtt_subscription, {Subscriber, Topic});
add_subscription(Share, Subscriber, Topic) ->
ets:insert(mqtt_subscription, {Subscriber, {share, Share, Topic}}).
monitor_subscriber(SubPid, State = #state{submon = SubMon}) when is_pid(SubPid) ->
State#state{submon = SubMon:monitor(SubPid)};
monitor_subscriber({SubId, SubPid}, State = #state{subids = SubIds, submon = SubMon}) ->
State#state{subids = maps:put(SubPid, SubId, SubIds), submon = SubMon:monitor(SubPid)}.
do_unsubscribe(Topic, Subscriber, State) ->
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
[{_, Options}] ->
emqx_pubsub:async_unsubscribe(Topic, Subscriber, Options),
Share = proplists:get_value(share, Options),
del_subscription(Share, Subscriber, Topic),
ets:delete(mqtt_subproperty, {Topic, Subscriber}),
{ok, State};
[] ->
{error, {subscription_not_found, Topic}}
end.
del_subscription(undefined, Subscriber, Topic) ->
ets:delete_object(mqtt_subscription, {Subscriber, Topic});
del_subscription(Share, Subscriber, Topic) ->
ets:delete_object(mqtt_subscription, {Subscriber, {share, Share, Topic}}).
clean_subscriber(Subscriber) ->
lists:foreach(fun({_, {share, Share, Topic}}) ->
clean_subscriber(Share, Subscriber, Topic);
({_, Topic}) ->
clean_subscriber(undefined, Subscriber, Topic)
end, ets:lookup(mqtt_subscription, Subscriber)),
ets:delete(mqtt_subscription, Subscriber).
clean_subscriber(Share, Subscriber, Topic) ->
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
[] ->
%% TODO:....???
Options = if Share == undefined -> []; true -> [{share, Share}] end,
emqx_pubsub:async_unsubscribe(Topic, Subscriber, Options);
[{_, Options}] ->
emqx_pubsub:async_unsubscribe(Topic, Subscriber, Options),
ets:delete(mqtt_subproperty, {Topic, Subscriber})
end.
demonitor_subscriber(SubPid, State = #state{subids = SubIds, submon = SubMon}) ->
State#state{subids = maps:remove(SubPid, SubIds), submon = SubMon:demonitor(SubPid)}.
setstats(State) ->
emqx_stats:setstats('subscriptions/count', 'subscriptions/max',
ets:info(mqtt_subscription, size)), State.
reply(Reply, State) ->
{reply, Reply, State}.
noreply(State) ->
{noreply, State}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -194,11 +194,11 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??...
-spec(publish(pid(), message()) -> ok | {error, term()}).
publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) ->
%% Publish QoS0 Directly
emqx_server:publish(Msg), ok;
emqx_broker:publish(Msg), ok;
publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) ->
%% Publish QoS1 message directly for client will PubAck automatically
emqx_server:publish(Msg), ok;
emqx_broker:publish(Msg), ok;
publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
%% Publish QoS2 to Session
@ -365,7 +365,7 @@ handle_cast({subscribe, From, TopicTable, AckFun},
?LOG(warning, "Duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], State),
SubMap;
{ok, OldQos} ->
emqx_server:setqos(Topic, ClientId, NewQos),
emqx_broker:setopts(Topic, ClientId, [{qos, NewQos}]),
emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}),
?LOG(warning, "Duplicated subscribe ~s, old_qos=~w, new_qos=~w",
[Topic, OldQos, NewQos], State),
@ -438,7 +438,7 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
{Msg, AwaitingRel1} ->
%% Implement Qos2 by method A [MQTT 4.33]
%% Dispatch to subscriber when received PUBREL
spawn(emqx_server, publish, [Msg]), %%:)
emqx_broker:publish(Msg), %% FIXME:
gc(State#state{awaiting_rel = AwaitingRel1});
error ->
?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,7 +18,9 @@
-behavior(supervisor).
-export([start_link/0, start_session/3]).
-include("emqx.hrl").
-export([start_link/0, start_session_process/1]).
-export([init/1]).
@ -27,10 +29,10 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc Start a session
-spec(start_session(boolean(), {binary(), binary() | undefined} , pid()) -> {ok, pid()}).
start_session(CleanSess, {ClientId, Username}, ClientPid) ->
supervisor:start_child(?MODULE, [CleanSess, {ClientId, Username}, ClientPid]).
%% @doc Start a session process
-spec(start_session_process(session()) -> {ok, pid()}).
start_session_process(Session) ->
supervisor:start_child(?MODULE, [Session]).
%%--------------------------------------------------------------------
%% Supervisor callbacks

171
src/emqx_shared_pubsub.erl Normal file
View File

@ -0,0 +1,171 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_shared_pubsub).
-behaviour(gen_server).
-include("emqx.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% API
-export([start_link/0]).
-export([strategy/0]).
-export([subscribe/3, unsubscribe/3]).
-export([dispatch/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(TABLE, shared_subscription).
-record(state, {pmon}).
-record(shared_subscription, {group, topic, subpid}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = ekka_mnesia:create_table(?TABLE, [
{type, bag},
{ram_copies, [node()]},
{record_name, shared_subscription},
{attributes, record_info(fields, shared_subscription)}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TABLE).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(strategy() -> random | hash).
strategy() ->
application:get_env(emqx, load_balancing_strategy, random).
subscribe(undefined, _Topic, _SubPid) ->
ok;
subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
mnesia:dirty_write(r(Group, Topic, SubPid)),
gen_server:cast(?SERVER, {monitor, SubPid}).
unsubscribe(undefined, _Topic, _SubPid) ->
ok;
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
mnesia:dirty_delete_object(r(Group, Topic, SubPid)).
r(Group, Topic, SubPid) ->
#shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
case pick(subscribers(Group, Topic)) of
false -> Delivery;
SubPid -> SubPid ! {dispatch, Topic, Msg},
Delivery#delivery{flows = [{dispatch, {Group, Topic}, 1} | Flows]}
end.
pick([]) ->
false;
pick([SubPid]) ->
SubPid;
pick(SubPids) ->
X = abs(erlang:monotonic_time()
bxor erlang:unique_integer()),
lists:nth((X rem length(SubPids)) + 1, SubPids).
subscribers(Group, Topic) ->
MP = {shared_subscription, Group, Topic, '$1'},
ets:select(shared_subscription, [{MP, [], ['$1']}]).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
mnesia:subscribe({table, ?TABLE, simple}),
{ok, #state{pmon = PMon}}.
init_monitors() ->
mnesia:foldl(
fun(#shared_subscription{subpid = SubPid}, Mon) ->
Mon:monitor(SubPid)
end, emqx_pmon:new(), ?TABLE).
handle_call(Req, _From, State) ->
emqx_log:error("[Shared] Unexpected request: ~p", [Req]),
{reply, ignore, State}.
handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) ->
{noreply, State#state{pmon = PMon:monitor(SubPid)}};
handle_cast(Msg, State) ->
emqx_log:error("[Shared] Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
emqx_log:info("Shared subscription created: ~p", [NewRecord]),
#shared_subscription{subpid = SubPid} = NewRecord,
{noreply, State#state{pmon = PMon:monitor(SubPid)}};
handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
emqx_log:info("Shared subscription deleted: ~p", [OldRecord]),
#shared_subscription{subpid = SubPid} = OldRecord,
{noreply, State#state{pmon = PMon:demonitor(SubPid)}};
handle_info({mnesia_table_event, _Event}, State) ->
{noreply, State};
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
emqx_log:info("Shared subscription down: ~p", [SubPid]),
mnesia:transaction(fun clean_down/1, [SubPid]),
{noreply, State#state{pmon = PMon:erase(SubPid)}};
handle_info(Info, State) ->
emqx_log:error("[Shared] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
mnesia:unsubscribe({table, ?TABLE, simple}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
clean_down(SubPid) ->
MP = #shared_subscription{_ = '_', subpid = SubPid},
lists:foreach(fun mnesia:delete_object/1, mnesia:match_object(MP)).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -29,7 +29,7 @@
%% API Function Exports
-export([start_link/2]).
-export([start_session/2, lookup_session/1, register_session/3,
-export([open_session/1, start_session/2, lookup_session/1, register_session/3,
unregister_session/1, unregister_session/2]).
-export([dispatch/3]).
@ -47,7 +47,7 @@
-define(TIMEOUT, 120000).
-define(LOG(Level, Format, Args, Session),
lager:Level("SM(~s): " ++ Format, [Session#mqtt_session.client_id | Args])).
lager:Level("SM(~s): " ++ Format, [Session#session.client_id | Args])).
%%--------------------------------------------------------------------
%% Mnesia callbacks
@ -55,7 +55,7 @@
mnesia(boot) ->
%% Global Session Table
ok = ekka_mnesia:create_table(mqtt_session, [
ok = ekka_mnesia:create_table(session, [
{type, set},
{ram_copies, [node()]},
{record_name, mqtt_session},
@ -68,6 +68,49 @@ mnesia(copy) ->
%% API
%%--------------------------------------------------------------------
%% Open a clean start session.
open_session(Session = #{client_id := ClientId, clean_start := true, expiry_interval := Interval}) ->
with_lock(ClientId,
fun() ->
{ResL, BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, discard_session, [ClientId]),
io:format("ResL: ~p, BadNodes: ~p~n", [ResL, BadNodes]),
case Interval > 0 of
true ->
{ok, emqx_session_sup:start_session_process(Session)};
false ->
{ok, emqx_session:init_state(Session)}
end
end).
open_session(Session = #{client_id := ClientId, clean_start := false, expiry_interval := Interval}) ->
with_lock(ClientId,
fun() ->
{ResL, BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, lookup_session, [ClientId]),
[SessionPid | _] = lists:flatten(ResL),
end).
lookup_session(ClientId) ->
ets:lookup(session, ClientId).
lookup_session(ClientId) ->
ets:lookup(session, ClientId).
with_lock(undefined, Fun) ->
Fun();
with_lock(ClientId, Fun) ->
case emqx_sm_locker:lock(ClientId) of
true -> Result = Fun(),
ok = emqx_sm_locker:unlock(ClientId),
Result;
false -> {error, client_id_unavailable}
end.
%% @doc Start a session manager
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id) ->
@ -92,7 +135,7 @@ lookup_session(ClientId) ->
register_session(ClientId, CleanSess, Properties) ->
ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}).
%% @doc Unregister a session.
%% @doc Unregister a Session.
-spec(unregister_session(binary()) -> boolean()).
unregister_session(ClientId) ->
unregister_session(ClientId, self()).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

43
src/emqx_sm_locker.erl Normal file
View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sm_locker).
-include("emqx.hrl").
-export([start_link/0]).
%% Lock/Unlock API based on canal-lock.
-export([lock/1, unlock/1]).
%% @doc Starts the lock server
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
canal_lock:start_link(?MODULE, 1).
%% @doc Lock a clientid
-spec(lock(client_id()) -> boolean()).
lock(ClientId) ->
case canal_lock:acquire(?MODULE, ClientId, 1, 1) of
{acquired, 1} -> true;
full -> false
end.
%% @doc Unlock a clientid
-spec(unlock(client_id()) -> ok).
unlock(ClientId) ->
canal_lock:release(?MODULE, ClientId, 1, 1).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -30,7 +30,7 @@
set_session_stats/2, get_session_stats/1, del_session_stats/1]).
%% Statistics API.
-export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstats/3]).
-export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstat/3]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -125,9 +125,9 @@ statsfun(Stat) ->
-spec(statsfun(Stat :: atom(), MaxStat :: atom()) -> fun()).
statsfun(Stat, MaxStat) ->
fun(Val) -> setstats(Stat, MaxStat, Val) end.
fun(Val) -> setstat(Stat, MaxStat, Val) end.
%% @doc Get broker statistics
%% @doc Get all statistics
-spec(getstats() -> [{atom(), non_neg_integer()}]).
getstats() ->
lists:sort(ets:tab2list(?STATS_TAB)).
@ -146,9 +146,9 @@ setstat(Stat, Val) ->
ets:update_element(?STATS_TAB, Stat, {2, Val}).
%% @doc Set stats with max
-spec(setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean()).
setstats(Stat, MaxStat, Val) ->
gen_server:cast(?MODULE, {setstats, Stat, MaxStat, Val}).
-spec(setstat(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean()).
setstat(Stat, MaxStat, Val) ->
gen_server:cast(?MODULE, {setstat, Stat, MaxStat, Val}).
%%--------------------------------------------------------------------
%% gen_server callbacks
@ -172,7 +172,7 @@ handle_call(_Request, _From, State) ->
{reply, error, State}.
%% atomic
handle_cast({setstats, Stat, MaxStat, Val}, State) ->
handle_cast({setstat, Stat, MaxStat, Val}, State) ->
MaxVal = ets:lookup_element(?STATS_TAB, MaxStat, 2),
if
Val > MaxVal ->

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -34,7 +34,6 @@
start_link() ->
supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
-spec(start_child(supervisor:child_spec()) -> startchild_ret()).
start_child(ChildSpec) when is_tuple(ChildSpec) ->
supervisor:start_child(?SUPERVISOR, ChildSpec).
@ -58,17 +57,16 @@ init([]) ->
{ok, {{one_for_all, 0, 1},
[?CHILD(emqx_ctl, worker),
?CHILD(emqx_hooks, worker),
?CHILD(emqx_router, worker),
?CHILD(emqx_pubsub_sup, supervisor),
?CHILD(emqx_stats, worker),
?CHILD(emqx_metrics, worker),
?CHILD(emqx_router_sup, supervisor),
?CHILD(emqx_broker_sup, supervisor),
?CHILD(emqx_pooler, supervisor),
?CHILD(emqx_trace_sup, supervisor),
?CHILD(emqx_cm_sup, supervisor),
?CHILD(emqx_sm_sup, supervisor),
?CHILD(emqx_session_sup, supervisor),
?CHILD(emqx_ws_client_sup, supervisor),
?CHILD(emqx_broker, worker),
?CHILD(emqx_ws_connection_sup, supervisor),
?CHILD(emqx_alarm, worker),
?CHILD(emqx_mod_sup, supervisor),
?CHILD(emqx_bridge_sup_sup, supervisor),

177
src/emqx_sys.erl Normal file
View File

@ -0,0 +1,177 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sys).
-behaviour(gen_server).
-include("emqx.hrl").
-export([start_link/0]).
-export([schedulers/0]).
-export([version/0, uptime/0, datetime/0, sysdescr/0, sys_interval/0]).
-export([info/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {started_at, heartbeat, sys_ticker, version, sysdescr}).
-define(APP, emqx).
-define(SERVER, ?MODULE).
%% $SYS Topics of Broker
-define(SYSTOP_BROKERS, [
version, % Broker version
uptime, % Broker uptime
datetime, % Broker local datetime
sysdescr % Broker description
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% @doc Get schedulers
-spec(schedulers() -> pos_integer()).
schedulers() ->
erlang:system_info(schedulers).
%% @doc Get sys version
-spec(version() -> string()).
version() ->
{ok, Version} = application:get_key(?APP, vsn), Version.
%% @doc Get sys description
-spec(sysdescr() -> string()).
sysdescr() ->
{ok, Descr} = application:get_key(?APP, description), Descr.
%% @doc Get sys uptime
-spec(uptime() -> string()).
uptime() -> gen_server:call(?SERVER, uptime).
%% @doc Get sys datetime
-spec(datetime() -> string()).
datetime() ->
{{Y, M, D}, {H, MM, S}} = calendar:local_time(),
lists:flatten(
io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
sys_interval() ->
application:get_env(?APP, sys_interval, 60000).
%% @doc Get sys info
-spec(info() -> list(tuple())).
info() ->
[{version, version()},
{sysdescr, sysdescr()},
{uptime, uptime()},
{datetime, datetime()}].
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
Tick = fun(I, M) ->
{ok, TRef} = timer:send_interval(I, M), TRef
end,
{ok, #state{started_at = os:timestamp(),
heartbeat = Tick(1000, heartbeat),
sys_ticker = Tick(sys_interval(), tick),
version = iolist_to_binary(version()),
sysdescr = iolist_to_binary(sysdescr())}, hibernate}.
handle_call(uptime, _From, State) ->
{reply, uptime(State), State};
handle_call(Req, _From, State) ->
emqx_log:error("[SYS] Unexpected request: ~p", [Req]),
{reply, ignore, State}.
handle_cast(Msg, State) ->
emqx_log:error("[SYS] Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(heartbeat, State) ->
publish(uptime, iolist_to_binary(uptime(State))),
publish(datetime, iolist_to_binary(datetime())),
{noreply, State, hibernate};
handle_info(tick, State = #state{version = Version, sysdescr = Descr}) ->
retain(brokers),
retain(version, Version),
retain(sysdescr, Descr),
{noreply, State, hibernate};
handle_info(Info, State) ->
emqx_log:error("[SYS] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{heartbeat = Hb, sys_ticker = TRef}) ->
timer:cancel(Hb),
timer:cancel(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
retain(brokers) ->
Payload = list_to_binary(string:join([atom_to_list(N) ||
N <- ekka_mnesia:running_nodes()], ",")),
Msg = emqx_message:make(broker, <<"$SYS/brokers">>, Payload),
emqx:publish(emqx_message:set_flag(sys, emqx_message:set_flag(retain, Msg))).
retain(Topic, Payload) when is_binary(Payload) ->
Msg = emqx_message:make(broker, emqx_topic:systop(Topic), Payload),
emqx:publish(emqx_message:set_flag(sys, emqx_message:set_flag(retain, Msg))).
publish(Topic, Payload) when is_binary(Payload) ->
Msg = emqx_message:make(broker, emqx_topic:systop(Topic), Payload),
emqx:publish(emqx_message:set_flag(sys, Msg)).
uptime(#state{started_at = Ts}) ->
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
lists:flatten(uptime(seconds, Secs)).
uptime(seconds, Secs) when Secs < 60 ->
[integer_to_list(Secs), " seconds"];
uptime(seconds, Secs) ->
[uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"];
uptime(minutes, M) when M < 60 ->
[integer_to_list(M), " minutes, "];
uptime(minutes, M) ->
[uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "];
uptime(hours, H) when H < 24 ->
[integer_to_list(H), " hours, "];
uptime(hours, H) ->
[uptime(days, H div 24), integer_to_list(H rem 24), " hours, "];
uptime(days, D) ->
[integer_to_list(D), " days,"].

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
-module(emqx_topic).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-import(lists, [reverse/1]).
@ -26,9 +28,7 @@
-export([parse/1, parse/2]).
-type(topic() :: binary()).
-type(option() :: local | {qos, mqtt_qos()} | {share, '$queue' | binary()}).
-type(option() :: {qos, mqtt_qos()} | {share, '$queue' | binary()}).
-type(word() :: '' | '+' | '#' | binary()).
@ -36,7 +36,7 @@
-type(triple() :: {root | binary(), word(), binary()}).
-export_type([topic/0, option/0, word/0, triple/0]).
-export_type([option/0, word/0, triple/0]).
-define(MAX_TOPIC_LEN, 4096).
@ -101,7 +101,7 @@ validate2([''|Words]) ->
validate2(['+'|Words]) ->
validate2(Words);
validate2([W|Words]) ->
case validate3(W) of true -> validate2(Words); false -> false end.
validate3(W) andalso validate2(Words).
validate3(<<>>) ->
true;
@ -177,39 +177,24 @@ join(Words) ->
parse(Topic) when is_binary(Topic) ->
parse(Topic, []).
parse(<<"$local/", Topic1/binary>>, Options) ->
if_not_contain(local, Options, fun() ->
parse(Topic1, [local | Options])
end);
parse(<<"$fastlane/", Topic1/binary>>, Options) ->
if_not_contain(fastlane, Options, fun() ->
parse(Topic1, [fastlane | Options])
end);
parse(<<"$queue/", Topic1/binary>>, Options) ->
if_not_contain(share, Options,fun() ->
parse(Topic1, [{share, '$queue'} | Options])
end);
parse(<<"$share/", Topic1/binary>>, Options) ->
if_not_contain(share, Options, fun() ->
[Share, Topic2] = binary:split(Topic1, <<"/">>),
{Topic2, [{share, Share} | Options]}
end);
parse(Topic, Options) ->
{Topic, Options}.
if_not_contain(Key, Options, Fun) when Key == local; Key == fastlane ->
case lists:member(Key, Options) of
true -> error(invalid_topic);
false -> Fun()
parse(Topic = <<"$fastlane/", Topic1/binary>>, Options) ->
case lists:member(fastlane, Options) of
true -> error({invalid_topic, Topic});
false -> parse(Topic1, [fastlane | Options])
end;
if_not_contain(share, Options, Fun) ->
parse(Topic = <<"$queue/", Topic1/binary>>, Options) ->
case lists:keyfind(share, 1, Options) of
true -> error(invalid_topic);
false -> Fun()
end.
{share, _} -> error({invalid_topic, Topic});
false -> parse(Topic1, [{share, '$queue'} | Options])
end;
parse(Topic = <<"$share/", Topic1/binary>>, Options) ->
case lists:keyfind(share, 1, Options) of
{share, _} -> error({invalid_topic, Topic});
false -> [Group, Topic2] = binary:split(Topic1, <<"/">>),
{Topic2, [{share, Group} | Options]}
end;
parse(Topic, Options) -> {Topic, Options}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,13 +14,16 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_trace).
-module(emqx_tracer).
-behaviour(gen_server).
%% API Function Exports
-include("emqx.hrl").
-export([start_link/0]).
-export([trace/3]).
-export([start_trace/2, stop_trace/1, all_traces/0]).
%% gen_server Function Exports
@ -31,16 +34,36 @@
-type(trace_who() :: {client | topic, binary()}).
-define(TRACE_OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]).
-define(OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]).
%%--------------------------------------------------------------------
%% API
%% Start the tracer
%%--------------------------------------------------------------------
-spec(start_link() -> {ok, pid()}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% Trace
%%--------------------------------------------------------------------
trace(publish, From, _Msg) when is_atom(From) ->
%% Dont' trace '$SYS' publish
ignore;
trace(publish, {ClientId, Username}, #message{topic = Topic, payload = Payload}) ->
lager:info([{client, ClientId}, {topic, Topic}],
"~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]);
trace(publish, From, #message{topic = Topic, payload = Payload})
when is_binary(From); is_list(From) ->
lager:info([{client, From}, {topic, Topic}],
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
%%--------------------------------------------------------------------
%% Start/Stop Trace
%%--------------------------------------------------------------------
%% @doc Start to trace client or topic.
-spec(start_trace(trace_who(), string()) -> ok | {error, term()}).
start_trace({client, ClientId}, LogFile) ->
@ -67,10 +90,10 @@ all_traces() -> gen_server:call(?MODULE, all_traces).
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{level = debug, traces = #{}}}.
{ok, #state{level = emqx:env(trace_level, debug), traces = #{}}}.
handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) ->
case lager:trace_file(LogFile, [Who], Level, ?TRACE_OPTIONS) of
case lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of
{ok, exists} ->
{reply, {error, existed}, State};
{ok, Trace} ->
@ -96,15 +119,15 @@ handle_call(all_traces, _From, State = #state{traces = Traces}) ->
<- maps:to_list(Traces)], State};
handle_call(Req, _From, State) ->
lager:error("[TRACE] Unexpected Call: ~p", [Req]),
emqx_log:error("[TRACE] Unexpected Call: ~p", [Req]),
{reply, ignore, State}.
handle_cast(Msg, State) ->
lager:error("[TRACE] Unexpected Cast: ~p", [Msg]),
emqx_log:error("[TRACE] Unexpected Cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
lager:error("[TRACE] Unexpected Info: ~p", [Info]),
emqx_log:error("[TRACE] Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,7 +18,7 @@
-include("emqx.hrl").
%% Mnesia Callbacks
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
@ -28,10 +28,10 @@
-export([insert/1, match/1, lookup/1, delete/1]).
%%--------------------------------------------------------------------
%% Mnesia Callbacks
%% Mnesia Bootstrap
%%--------------------------------------------------------------------
%% @doc Create or Replicate trie tables.
%% @doc Create or replicate trie tables.
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
%% Trie Table
@ -55,8 +55,8 @@ mnesia(copy) ->
%% Trie API
%%--------------------------------------------------------------------
%% @doc Insert topic to trie
-spec(insert(Topic :: binary()) -> ok).
%% @doc Insert a topic into the trie
-spec(insert(Topic :: topic()) -> ok).
insert(Topic) when is_binary(Topic) ->
case mnesia:read(trie_node, Topic) of
[#trie_node{topic = Topic}] ->
@ -64,25 +64,25 @@ insert(Topic) when is_binary(Topic) ->
[TrieNode = #trie_node{topic = undefined}] ->
write_trie_node(TrieNode#trie_node{topic = Topic});
[] ->
% Add trie path
%% Add trie path
lists:foreach(fun add_path/1, emqx_topic:triples(Topic)),
% Add last node
%% Add last node
write_trie_node(#trie_node{node_id = Topic, topic = Topic})
end.
%% @doc Find trie nodes that match topic
-spec(match(Topic :: binary()) -> list(MatchedTopic :: binary())).
%% @doc Find trie nodes that match the topic
-spec(match(Topic :: topic()) -> list(MatchedTopic :: topic())).
match(Topic) when is_binary(Topic) ->
TrieNodes = match_node(root, emqx_topic:words(Topic)),
[Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
%% @doc Lookup a Trie Node
%% @doc Lookup a trie node
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
lookup(NodeId) ->
mnesia:read(trie_node, NodeId).
%% @doc Delete topic from trie
-spec(delete(Topic :: binary()) -> ok).
%% @doc Delete a topic from the trie
-spec(delete(Topic :: topic()) -> ok).
delete(Topic) when is_binary(Topic) ->
case mnesia:read(trie_node, Topic) of
[#trie_node{edge_count = 0}] ->
@ -95,11 +95,11 @@ delete(Topic) when is_binary(Topic) ->
end.
%%--------------------------------------------------------------------
%% Internal Functions
%% Internal functions
%%--------------------------------------------------------------------
%% @private
%% @doc Add path to trie tree.
%% @doc Add a path to the trie.
add_path({Node, Word, Child}) ->
Edge = #trie_edge{node_id = Node, word = Word},
case mnesia:read(trie_node, Node) of
@ -146,7 +146,7 @@ match_node(NodeId, [W|Words], ResAcc) ->
end.
%% @private
%% @doc Delete paths from trie tree.
%% @doc Delete paths from the trie.
delete_path([]) ->
ok;
delete_path([{NodeId, Word, _} | RestPath]) ->

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ws_conn).
-module(emqx_ws_connection).
-behaviour(gen_server).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ws_conn_sup).
-module(emqx_ws_connection_sup).
-behavior(supervisor).
@ -39,6 +39,6 @@ init([]) ->
%%TODO: Cannot upgrade the environments, Use zone?
Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])),
{ok, {{simple_one_for_one, 0, 1},
[{ws_conn, {emqx_ws_conn, start_link, [Env]},
temporary, 5000, worker, [emqx_ws_conn]}]}}.
[{ws_connection, {emqx_ws_connection, start_link, [Env]},
temporary, 5000, worker, [emqx_ws_connection]}]}}.