-spec().
This commit is contained in:
parent
17f40f458f
commit
50a11ce6c9
|
@ -47,15 +47,15 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start access control server.
|
||||
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
|
||||
start_link() -> start_link(emqttd:env(access)).
|
||||
|
||||
-spec start_link(Opts :: list()) -> {ok, pid()} | ignore | {error, any()}.
|
||||
-spec(start_link(Opts :: list()) -> {ok, pid()} | ignore | {error, any()}).
|
||||
start_link(Opts) ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
|
||||
|
||||
%% @doc Authenticate MQTT Client.
|
||||
-spec auth(Client :: mqtt_client(), Password :: password()) -> ok | {error, any()}.
|
||||
-spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {error, any()}).
|
||||
auth(Client, Password) when is_record(Client, mqtt_client) ->
|
||||
auth(Client, Password, lookup_mods(auth)).
|
||||
auth(_Client, _Password, []) ->
|
||||
|
@ -69,10 +69,10 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
|
|||
end.
|
||||
|
||||
%% @doc Check ACL
|
||||
-spec check_acl(Client, PubSub, Topic) -> allow | deny when
|
||||
-spec(check_acl(Client, PubSub, Topic) -> allow | deny when
|
||||
Client :: mqtt_client(),
|
||||
PubSub :: pubsub(),
|
||||
Topic :: binary().
|
||||
Topic :: binary()).
|
||||
check_acl(Client, PubSub, Topic) when ?IS_PUBSUB(PubSub) ->
|
||||
case lookup_mods(acl) of
|
||||
[] -> allow;
|
||||
|
@ -89,26 +89,26 @@ check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
|
|||
end.
|
||||
|
||||
%% @doc Reload ACL Rules.
|
||||
-spec reload_acl() -> list(ok | {error, any()}).
|
||||
-spec(reload_acl() -> list(ok | {error, any()})).
|
||||
reload_acl() ->
|
||||
[Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)].
|
||||
|
||||
%% @doc Register Authentication or ACL module.
|
||||
-spec register_mod(auth | acl, atom(), list()) -> ok | {error, any()}.
|
||||
-spec(register_mod(auth | acl, atom(), list()) -> ok | {error, any()}).
|
||||
register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl->
|
||||
register_mod(Type, Mod, Opts, 0).
|
||||
|
||||
-spec register_mod(auth | acl, atom(), list(), non_neg_integer()) -> ok | {error, any()}.
|
||||
-spec(register_mod(auth | acl, atom(), list(), non_neg_integer()) -> ok | {error, any()}).
|
||||
register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl->
|
||||
gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}).
|
||||
|
||||
%% @doc Unregister authentication or ACL module
|
||||
-spec unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, any()}.
|
||||
-spec(unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, any()}).
|
||||
unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
|
||||
gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
|
||||
|
||||
%% @doc Lookup authentication or ACL modules.
|
||||
-spec lookup_mods(auth | acl) -> list().
|
||||
-spec(lookup_mods(auth | acl) -> list()).
|
||||
lookup_mods(Type) ->
|
||||
case ets:lookup(?ACCESS_CONTROL_TAB, tab_key(Type)) of
|
||||
[] -> [];
|
||||
|
|
|
@ -85,7 +85,7 @@ bin(B) when is_binary(B) ->
|
|||
B.
|
||||
|
||||
%% @doc Match Access Rule
|
||||
-spec match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch.
|
||||
-spec(match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch).
|
||||
match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
|
||||
{matched, AllowDeny};
|
||||
match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters})
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Read all rules
|
||||
-spec all_rules() -> list(emqttd_access_rule:rule()).
|
||||
-spec(all_rules() -> list(emqttd_access_rule:rule())).
|
||||
all_rules() ->
|
||||
case ets:lookup(?ACL_RULE_TAB, all_rules) of
|
||||
[] -> [];
|
||||
|
@ -46,7 +46,7 @@ all_rules() ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Init internal ACL
|
||||
-spec init(AclOpts :: list()) -> {ok, State :: any()}.
|
||||
-spec(init(AclOpts :: list()) -> {ok, State :: any()}).
|
||||
init(AclOpts) ->
|
||||
ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]),
|
||||
AclFile = proplists:get_value(file, AclOpts),
|
||||
|
@ -78,11 +78,11 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
|
|||
false.
|
||||
|
||||
%% @doc Check ACL
|
||||
-spec check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when
|
||||
-spec(check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when
|
||||
Client :: mqtt_client(),
|
||||
PubSub :: pubsub(),
|
||||
Topic :: binary(),
|
||||
State :: #state{}.
|
||||
State :: #state{}).
|
||||
check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) ->
|
||||
case match(Client, Topic, lookup(PubSub)) of
|
||||
{matched, allow} -> allow;
|
||||
|
@ -106,7 +106,7 @@ match(Client, Topic, [Rule|Rules]) ->
|
|||
end.
|
||||
|
||||
%% @doc Reload ACL
|
||||
-spec reload_acl(State :: #state{}) -> ok | {error, Reason :: any()}.
|
||||
-spec(reload_acl(State :: #state{}) -> ok | {error, Reason :: any()}).
|
||||
reload_acl(State) ->
|
||||
case catch load_rules_from_file(State) of
|
||||
{'EXIT', Error} -> {error, Error};
|
||||
|
@ -114,6 +114,6 @@ reload_acl(State) ->
|
|||
end.
|
||||
|
||||
%% @doc ACL Module Description
|
||||
-spec description() -> string().
|
||||
-spec(description() -> string()).
|
||||
description() -> "Internal ACL with etc/acl.config".
|
||||
|
||||
|
|
|
@ -54,15 +54,15 @@ alarm_fun(Bool) ->
|
|||
(clear, _AlarmId) when Bool =:= false -> alarm_fun(false)
|
||||
end.
|
||||
|
||||
-spec set_alarm(mqtt_alarm()) -> ok.
|
||||
-spec(set_alarm(mqtt_alarm()) -> ok).
|
||||
set_alarm(Alarm) when is_record(Alarm, mqtt_alarm) ->
|
||||
gen_event:notify(?ALARM_MGR, {set_alarm, Alarm}).
|
||||
|
||||
-spec clear_alarm(any()) -> ok.
|
||||
-spec(clear_alarm(any()) -> ok).
|
||||
clear_alarm(AlarmId) when is_binary(AlarmId) ->
|
||||
gen_event:notify(?ALARM_MGR, {clear_alarm, AlarmId}).
|
||||
|
||||
-spec get_alarms() -> list(mqtt_alarm()).
|
||||
-spec(get_alarms() -> list(mqtt_alarm())).
|
||||
get_alarms() ->
|
||||
gen_event:call(?ALARM_MGR, ?MODULE, get_alarms).
|
||||
|
||||
|
|
|
@ -39,11 +39,11 @@
|
|||
%% Application callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec start(StartType, StartArgs) -> {ok, pid()} | {ok, pid(), State} | {error, Reason} when
|
||||
StartType :: normal | {takeover, node()} | {failover, node()},
|
||||
StartArgs :: term(),
|
||||
State :: term(),
|
||||
Reason :: term().
|
||||
-spec(start(StartType, StartArgs) -> {ok, pid()} | {ok, pid(), State} | {error, Reason} when
|
||||
StartType :: normal | {takeover, node()} | {failover, node()},
|
||||
StartArgs :: term(),
|
||||
State :: term(),
|
||||
Reason :: term()).
|
||||
start(_StartType, _StartArgs) ->
|
||||
print_banner(),
|
||||
emqttd_mnesia:start(),
|
||||
|
@ -57,7 +57,7 @@ start(_StartType, _StartArgs) ->
|
|||
print_vsn(),
|
||||
{ok, Sup}.
|
||||
|
||||
-spec stop(State :: term()) -> term().
|
||||
-spec(stop(State :: term()) -> term()).
|
||||
stop(_State) ->
|
||||
catch stop_listeners().
|
||||
|
||||
|
@ -159,7 +159,7 @@ load_mod({Name, Opts}) ->
|
|||
end.
|
||||
|
||||
%% @doc Is module enabled?
|
||||
-spec is_mod_enabled(Name :: atom()) -> boolean().
|
||||
-spec(is_mod_enabled(Name :: atom()) -> boolean()).
|
||||
is_mod_enabled(Name) -> emqttd:env(modules, Name) =/= undefined.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -167,11 +167,11 @@ is_mod_enabled(Name) -> emqttd:env(modules, Name) =/= undefined.
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start Listeners of the broker.
|
||||
-spec start_listeners() -> any().
|
||||
-spec(start_listeners() -> any()).
|
||||
start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners)).
|
||||
|
||||
%% Start mqtt listener
|
||||
-spec start_listener(listener()) -> any().
|
||||
-spec(start_listener(listener()) -> any()).
|
||||
start_listener({mqtt, Port, Opts}) -> start_listener(mqtt, Port, Opts);
|
||||
|
||||
%% Start mqtt(SSL) listener
|
||||
|
|
|
@ -35,28 +35,28 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Add clientid
|
||||
-spec add_clientid(binary()) -> {atomic, ok} | {aborted, any()}.
|
||||
-spec(add_clientid(binary()) -> {atomic, ok} | {aborted, any()}).
|
||||
add_clientid(ClientId) when is_binary(ClientId) ->
|
||||
R = #mqtt_auth_clientid{client_id = ClientId},
|
||||
mnesia:transaction(fun mnesia:write/1, [R]).
|
||||
|
||||
%% @doc Add clientid with password
|
||||
-spec add_clientid(binary(), binary()) -> {atomic, ok} | {aborted, any()}.
|
||||
-spec(add_clientid(binary(), binary()) -> {atomic, ok} | {aborted, any()}).
|
||||
add_clientid(ClientId, Password) ->
|
||||
R = #mqtt_auth_clientid{client_id = ClientId, password = Password},
|
||||
mnesia:transaction(fun mnesia:write/1, [R]).
|
||||
|
||||
%% @doc Lookup clientid
|
||||
-spec lookup_clientid(binary()) -> list(#mqtt_auth_clientid{}).
|
||||
-spec(lookup_clientid(binary()) -> list(#mqtt_auth_clientid{})).
|
||||
lookup_clientid(ClientId) ->
|
||||
mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId).
|
||||
|
||||
%% @doc Lookup all clientids
|
||||
-spec all_clientids() -> list(binary()).
|
||||
-spec(all_clientids() -> list(binary())).
|
||||
all_clientids() -> mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB).
|
||||
|
||||
%% @doc Remove clientid
|
||||
-spec remove_clientid(binary()) -> {atomic, ok} | {aborted, any()}.
|
||||
-spec(remove_clientid(binary()) -> {atomic, ok} | {aborted, any()}).
|
||||
remove_clientid(ClientId) ->
|
||||
mnesia:transaction(fun mnesia:delete/1, [{?AUTH_CLIENTID_TAB, ClientId}]).
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ behaviour_info(_Other) ->
|
|||
-endif.
|
||||
|
||||
%% @doc Password Hash
|
||||
-spec passwd_hash(hash_type(), binary()) -> binary().
|
||||
-spec(passwd_hash(hash_type(), binary()) -> binary()).
|
||||
passwd_hash(plain, Password) ->
|
||||
Password;
|
||||
passwd_hash(md5, Password) ->
|
||||
|
|
|
@ -72,7 +72,7 @@ is_enabled() ->
|
|||
lists:member(?AUTH_USERNAME_TAB, mnesia:system_info(tables)).
|
||||
|
||||
%% @doc Add User
|
||||
-spec add_user(binary(), binary()) -> ok | {error, any()}.
|
||||
-spec(add_user(binary(), binary()) -> ok | {error, any()}).
|
||||
add_user(Username, Password) ->
|
||||
User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)},
|
||||
ret(mnesia:transaction(fun mnesia:write/1, [User])).
|
||||
|
@ -81,12 +81,12 @@ add_default_user(Username, Password) ->
|
|||
add_user(iolist_to_binary(Username), iolist_to_binary(Password)).
|
||||
|
||||
%% @doc Lookup user by username
|
||||
-spec lookup_user(binary()) -> list().
|
||||
-spec(lookup_user(binary()) -> list()).
|
||||
lookup_user(Username) ->
|
||||
mnesia:dirty_read(?AUTH_USERNAME_TAB, Username).
|
||||
|
||||
%% @doc Remove user
|
||||
-spec remove_user(binary()) -> ok | {error, any()}.
|
||||
-spec(remove_user(binary()) -> ok | {error, any()}).
|
||||
remove_user(Username) ->
|
||||
ret(mnesia:transaction(fun mnesia:delete/1, [{?AUTH_USERNAME_TAB, Username}])).
|
||||
|
||||
|
@ -94,7 +94,7 @@ ret({atomic, ok}) -> ok;
|
|||
ret({aborted, Error}) -> {error, Error}.
|
||||
|
||||
%% @doc All usernames
|
||||
-spec all_users() -> list().
|
||||
-spec(all_users() -> list()).
|
||||
all_users() -> mnesia:dirty_all_keys(?AUTH_USERNAME_TAB).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -124,17 +124,17 @@ add_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}
|
|||
end)).
|
||||
|
||||
%% @doc Lookup static subscriptions.
|
||||
-spec lookup_subscriptions(binary()) -> list(mqtt_subscription()).
|
||||
-spec(lookup_subscriptions(binary()) -> list(mqtt_subscription())).
|
||||
lookup_subscriptions(ClientId) when is_binary(ClientId) ->
|
||||
mnesia:dirty_read(backend_subscription, ClientId).
|
||||
|
||||
%% @doc Delete static subscriptions by ClientId manually.
|
||||
-spec del_subscriptions(binary()) -> ok.
|
||||
-spec(del_subscriptions(binary()) -> ok).
|
||||
del_subscriptions(ClientId) when is_binary(ClientId) ->
|
||||
return(mnesia:transaction(fun mnesia:delete/1, [{backend_subscription, ClientId}])).
|
||||
|
||||
%% @doc Delete a static subscription manually.
|
||||
-spec del_subscription(binary(), binary()) -> ok.
|
||||
-spec(del_subscription(binary(), binary()) -> ok).
|
||||
del_subscription(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) ->
|
||||
return(mnesia:transaction(fun del_subscription_/1, [match_pattern(ClientId, Topic)])).
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start a bridge
|
||||
-spec start_link(atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}.
|
||||
-spec(start_link(atom(), binary(), [option()]) -> {ok, pid()} | ignore | {error, term()}).
|
||||
start_link(Node, Topic, Options) ->
|
||||
gen_server2:start_link(?MODULE, [Node, Topic, Options], []).
|
||||
|
||||
|
|
|
@ -33,17 +33,17 @@ start_link() ->
|
|||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
%% @doc List all bridges
|
||||
-spec bridges() -> [{tuple(), pid()}].
|
||||
-spec(bridges() -> [{tuple(), pid()}]).
|
||||
bridges() ->
|
||||
[{{Node, Topic}, Pid} || {?BRIDGE_ID(Node, Topic), Pid, worker, _}
|
||||
<- supervisor:which_children(?MODULE)].
|
||||
|
||||
%% @doc Start a bridge
|
||||
-spec start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}.
|
||||
-spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}).
|
||||
start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
|
||||
start_bridge(Node, Topic, []).
|
||||
|
||||
-spec start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}.
|
||||
-spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}).
|
||||
start_bridge(Node, _Topic, _Options) when Node =:= node() ->
|
||||
{error, bridge_to_self};
|
||||
start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
|
||||
|
@ -51,7 +51,7 @@ start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -
|
|||
supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).
|
||||
|
||||
%% @doc Stop a bridge
|
||||
-spec stop_bridge(atom(), binary()) -> {ok, pid()} | ok.
|
||||
-spec(stop_bridge(atom(), binary()) -> {ok, pid()} | ok).
|
||||
stop_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
|
||||
ChildId = ?BRIDGE_ID(Node, Topic),
|
||||
case supervisor:terminate_child(?MODULE, ChildId) of
|
||||
|
|
|
@ -57,17 +57,17 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start emqttd broker
|
||||
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%% @doc Subscribe broker event
|
||||
-spec subscribe(EventType :: any()) -> ok.
|
||||
-spec(subscribe(EventType :: any()) -> ok).
|
||||
subscribe(EventType) ->
|
||||
gproc:reg({p, l, {broker, EventType}}).
|
||||
|
||||
%% @doc Notify broker event
|
||||
-spec notify(EventType :: any(), Event :: any()) -> ok.
|
||||
-spec(notify(EventType :: any(), Event :: any()) -> ok).
|
||||
notify(EventType, Event) ->
|
||||
gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}).
|
||||
|
||||
|
@ -76,21 +76,21 @@ env(Name) ->
|
|||
proplists:get_value(Name, emqttd:env(broker)).
|
||||
|
||||
%% @doc Get broker version
|
||||
-spec version() -> string().
|
||||
-spec(version() -> string()).
|
||||
version() ->
|
||||
{ok, Version} = application:get_key(emqttd, vsn), Version.
|
||||
|
||||
%% @doc Get broker description
|
||||
-spec sysdescr() -> string().
|
||||
-spec(sysdescr() -> string()).
|
||||
sysdescr() ->
|
||||
{ok, Descr} = application:get_key(emqttd, description), Descr.
|
||||
|
||||
%% @doc Get broker uptime
|
||||
-spec uptime() -> string().
|
||||
-spec(uptime() -> string()).
|
||||
uptime() -> gen_server:call(?SERVER, uptime).
|
||||
|
||||
%% @doc Get broker datetime
|
||||
-spec datetime() -> string().
|
||||
-spec(datetime() -> string()).
|
||||
datetime() ->
|
||||
{{Y, M, D}, {H, MM, S}} = calendar:local_time(),
|
||||
lists:flatten(
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
-export([prepare/0, reboot/0]).
|
||||
|
||||
%% @doc Join cluster
|
||||
-spec join(node()) -> ok | {error, any()}.
|
||||
-spec(join(node()) -> ok | {error, any()}).
|
||||
join(Node) when Node =:= node() ->
|
||||
{error, {cannot_join_with_self, Node}};
|
||||
|
||||
|
@ -40,23 +40,23 @@ join(Node) when is_atom(Node) ->
|
|||
end.
|
||||
|
||||
%% @doc Prepare to join or leave cluster.
|
||||
-spec prepare() -> ok.
|
||||
-spec(prepare() -> ok).
|
||||
prepare() ->
|
||||
emqttd_plugins:unload(),
|
||||
lists:foreach(fun application:stop/1, [emqttd, mochiweb, esockd, gproc]).
|
||||
|
||||
%% @doc Is node in cluster?
|
||||
-spec is_clustered(node()) -> boolean().
|
||||
-spec(is_clustered(node()) -> boolean()).
|
||||
is_clustered(Node) ->
|
||||
lists:member(Node, emqttd_mnesia:running_nodes()).
|
||||
|
||||
%% @doc Reboot after join or leave cluster.
|
||||
-spec reboot() -> ok.
|
||||
-spec(reboot() -> ok).
|
||||
reboot() ->
|
||||
lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, emqttd]).
|
||||
|
||||
%% @doc Leave from Cluster.
|
||||
-spec leave() -> ok | {error, any()}.
|
||||
-spec(leave() -> ok | {error, any()}).
|
||||
leave() ->
|
||||
case emqttd_mnesia:running_nodes() -- [node()] of
|
||||
[_|_] ->
|
||||
|
@ -66,7 +66,7 @@ leave() ->
|
|||
end.
|
||||
|
||||
%% @doc Remove a node from cluster.
|
||||
-spec remove(node()) -> ok | {error, any()}.
|
||||
-spec(remove(node()) -> ok | {error, any()}).
|
||||
remove(Node) when Node =:= node() ->
|
||||
{error, {cannot_remove_self, Node}};
|
||||
|
||||
|
|
|
@ -44,15 +44,15 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start Client Manager
|
||||
-spec start_link(Pool, Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
|
||||
Pool :: atom(),
|
||||
Id :: pos_integer(),
|
||||
StatsFun :: fun().
|
||||
-spec(start_link(Pool, Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
|
||||
Pool :: atom(),
|
||||
Id :: pos_integer(),
|
||||
StatsFun :: fun()).
|
||||
start_link(Pool, Id, StatsFun) ->
|
||||
gen_server2:start_link(?MODULE, [Pool, Id, StatsFun], []).
|
||||
|
||||
%% @doc Lookup Client by ClientId
|
||||
-spec lookup(ClientId :: binary()) -> mqtt_client() | undefined.
|
||||
-spec(lookup(ClientId :: binary()) -> mqtt_client() | undefined).
|
||||
lookup(ClientId) when is_binary(ClientId) ->
|
||||
case ets:lookup(mqtt_client, ClientId) of
|
||||
[Client] -> Client;
|
||||
|
@ -60,7 +60,7 @@ lookup(ClientId) when is_binary(ClientId) ->
|
|||
end.
|
||||
|
||||
%% @doc Lookup client pid by clientId
|
||||
-spec lookup_proc(ClientId :: binary()) -> pid() | undefined.
|
||||
-spec(lookup_proc(ClientId :: binary()) -> pid() | undefined).
|
||||
lookup_proc(ClientId) when is_binary(ClientId) ->
|
||||
try ets:lookup_element(mqtt_client, ClientId, #mqtt_client.client_pid)
|
||||
catch
|
||||
|
@ -68,13 +68,13 @@ lookup_proc(ClientId) when is_binary(ClientId) ->
|
|||
end.
|
||||
|
||||
%% @doc Register ClientId with Pid.
|
||||
-spec register(Client :: mqtt_client()) -> ok.
|
||||
-spec(register(Client :: mqtt_client()) -> ok).
|
||||
register(Client = #mqtt_client{client_id = ClientId}) ->
|
||||
CmPid = gproc_pool:pick_worker(?POOL, ClientId),
|
||||
gen_server2:cast(CmPid, {register, Client}).
|
||||
|
||||
%% @doc Unregister clientId with pid.
|
||||
-spec unregister(ClientId :: binary()) -> ok.
|
||||
-spec(unregister(ClientId :: binary()) -> ok).
|
||||
unregister(ClientId) when is_binary(ClientId) ->
|
||||
CmPid = gproc_pool:pick_worker(?POOL, ClientId),
|
||||
gen_server2:cast(CmPid, {unregister, ClientId, self()}).
|
||||
|
|
|
@ -44,24 +44,24 @@ start_link() ->
|
|||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%% @doc Register a command
|
||||
-spec register_cmd(atom(), {module(), atom()}) -> ok.
|
||||
-spec(register_cmd(atom(), {module(), atom()}) -> ok).
|
||||
register_cmd(Cmd, MF) ->
|
||||
register_cmd(Cmd, MF, []).
|
||||
|
||||
%% @doc Register a command with opts
|
||||
-spec register_cmd(atom(), {module(), atom()}, list()) -> ok.
|
||||
-spec(register_cmd(atom(), {module(), atom()}, list()) -> ok).
|
||||
register_cmd(Cmd, MF, Opts) ->
|
||||
cast({register_cmd, Cmd, MF, Opts}).
|
||||
|
||||
%% @doc Unregister a command
|
||||
-spec unregister_cmd(atom()) -> ok.
|
||||
-spec(unregister_cmd(atom()) -> ok).
|
||||
unregister_cmd(Cmd) ->
|
||||
cast({unregister_cmd, Cmd}).
|
||||
|
||||
cast(Msg) -> gen_server:cast(?SERVER, Msg).
|
||||
|
||||
%% @doc Run a command
|
||||
-spec run([string()]) -> any().
|
||||
-spec(run([string()]) -> any()).
|
||||
run([]) -> usage();
|
||||
|
||||
run(["help"]) -> usage();
|
||||
|
@ -73,7 +73,7 @@ run([CmdS|Args]) ->
|
|||
end.
|
||||
|
||||
%% @doc Lookup a command
|
||||
-spec lookup(atom()) -> [{module(), atom()}].
|
||||
-spec(lookup(atom()) -> [{module(), atom()}]).
|
||||
lookup(Cmd) ->
|
||||
case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of
|
||||
[El] -> El;
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
-type guid() :: <<_:128>>.
|
||||
|
||||
%% @doc Generate a global unique id.
|
||||
-spec gen() -> guid().
|
||||
-spec(gen() -> guid()).
|
||||
gen() ->
|
||||
Guid = case get(guid) of
|
||||
undefined -> new();
|
||||
|
@ -47,7 +47,7 @@ gen() ->
|
|||
new() ->
|
||||
{ts(), npid(), 0}.
|
||||
|
||||
-spec timestamp(guid()) -> integer().
|
||||
-spec(timestamp(guid()) -> integer()).
|
||||
timestamp(<<Ts:64, _/binary>>) ->
|
||||
Ts.
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
-type keepalive() :: #keepalive{}.
|
||||
|
||||
%% @doc Start a keepalive
|
||||
-spec start(fun(), integer(), any()) -> undefined | keepalive().
|
||||
-spec(start(fun(), integer(), any()) -> undefined | keepalive()).
|
||||
start(_, 0, _) ->
|
||||
undefined;
|
||||
start(StatFun, TimeoutSec, TimeoutMsg) ->
|
||||
|
@ -36,7 +36,7 @@ start(StatFun, TimeoutSec, TimeoutMsg) ->
|
|||
tref = timer(TimeoutSec, TimeoutMsg)}.
|
||||
|
||||
%% @doc Check keepalive, called when timeout.
|
||||
-spec check(keepalive()) -> {ok, keepalive()} | {error, any()}.
|
||||
-spec(check(keepalive()) -> {ok, keepalive()} | {error, any()}).
|
||||
check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
|
||||
case StatFun() of
|
||||
{ok, NewVal} ->
|
||||
|
@ -55,7 +55,7 @@ resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
|
|||
KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}.
|
||||
|
||||
%% @doc Cancel Keepalive
|
||||
-spec cancel(keepalive()) -> ok.
|
||||
-spec(cancel(keepalive()) -> ok).
|
||||
cancel(#keepalive{tref = TRef}) ->
|
||||
cancel(TRef);
|
||||
cancel(undefined) ->
|
||||
|
|
|
@ -29,21 +29,21 @@
|
|||
-export([format/1]).
|
||||
|
||||
%% @doc Make a message
|
||||
-spec make(From, Topic, Payload) -> mqtt_message() when
|
||||
From :: atom() | binary(),
|
||||
Topic :: binary(),
|
||||
Payload :: binary().
|
||||
-spec(make(From, Topic, Payload) -> mqtt_message() when
|
||||
From :: atom() | binary(),
|
||||
Topic :: binary(),
|
||||
Payload :: binary()).
|
||||
make(From, Topic, Payload) ->
|
||||
#mqtt_message{topic = Topic,
|
||||
from = From,
|
||||
payload = Payload,
|
||||
timestamp = os:timestamp()}.
|
||||
|
||||
-spec make(From, Qos, Topic, Payload) -> mqtt_message() when
|
||||
From :: atom() | binary(),
|
||||
Qos :: mqtt_qos() | mqtt_qos_name(),
|
||||
Topic :: binary(),
|
||||
Payload :: binary().
|
||||
-spec(make(From, Qos, Topic, Payload) -> mqtt_message() when
|
||||
From :: atom() | binary(),
|
||||
Qos :: mqtt_qos() | mqtt_qos_name(),
|
||||
Topic :: binary(),
|
||||
Payload :: binary()).
|
||||
make(From, Qos, Topic, Payload) ->
|
||||
#mqtt_message{msgid = msgid(?QOS_I(Qos)),
|
||||
topic = Topic,
|
||||
|
@ -53,7 +53,7 @@ make(From, Qos, Topic, Payload) ->
|
|||
timestamp = os:timestamp()}.
|
||||
|
||||
%% @doc Message from Packet
|
||||
-spec from_packet(mqtt_packet()) -> mqtt_message().
|
||||
-spec(from_packet(mqtt_packet()) -> mqtt_message()).
|
||||
from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||
retain = Retain,
|
||||
qos = Qos,
|
||||
|
@ -103,7 +103,7 @@ msgid(Qos) when Qos =:= ?QOS_1 orelse Qos =:= ?QOS_2 ->
|
|||
emqttd_guid:gen().
|
||||
|
||||
%% @doc Message to packet
|
||||
-spec to_packet(mqtt_message()) -> mqtt_packet().
|
||||
-spec(to_packet(mqtt_message()) -> mqtt_packet()).
|
||||
to_packet(#mqtt_message{pktid = PkgId,
|
||||
qos = Qos,
|
||||
retain = Retain,
|
||||
|
@ -124,11 +124,11 @@ to_packet(#mqtt_message{pktid = PkgId,
|
|||
payload = Payload}.
|
||||
|
||||
%% @doc set dup, retain flag
|
||||
-spec set_flag(mqtt_message()) -> mqtt_message().
|
||||
-spec(set_flag(mqtt_message()) -> mqtt_message()).
|
||||
set_flag(Msg) ->
|
||||
Msg#mqtt_message{dup = true, retain = true}.
|
||||
|
||||
-spec set_flag(atom(), mqtt_message()) -> mqtt_message().
|
||||
-spec(set_flag(atom(), mqtt_message()) -> mqtt_message()).
|
||||
set_flag(dup, Msg = #mqtt_message{dup = false}) ->
|
||||
Msg#mqtt_message{dup = true};
|
||||
set_flag(sys, Msg = #mqtt_message{sys = false}) ->
|
||||
|
@ -138,11 +138,11 @@ set_flag(retain, Msg = #mqtt_message{retain = false}) ->
|
|||
set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
|
||||
|
||||
%% @doc Unset dup, retain flag
|
||||
-spec unset_flag(mqtt_message()) -> mqtt_message().
|
||||
-spec(unset_flag(mqtt_message()) -> mqtt_message()).
|
||||
unset_flag(Msg) ->
|
||||
Msg#mqtt_message{dup = false, retain = false}.
|
||||
|
||||
-spec unset_flag(dup | retain | atom(), mqtt_message()) -> mqtt_message().
|
||||
-spec(unset_flag(dup | retain | atom(), mqtt_message()) -> mqtt_message()).
|
||||
unset_flag(dup, Msg = #mqtt_message{dup = true}) ->
|
||||
Msg#mqtt_message{dup = false};
|
||||
unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
|
||||
|
|
|
@ -91,12 +91,12 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start metrics server
|
||||
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%% @doc Count packets received.
|
||||
-spec received(mqtt_packet()) -> ok.
|
||||
-spec(received(mqtt_packet()) -> ok).
|
||||
received(Packet) ->
|
||||
inc('packets/received'),
|
||||
received1(Packet).
|
||||
|
@ -134,7 +134,7 @@ qos_received(?QOS_2) ->
|
|||
inc('messages/qos2/received').
|
||||
|
||||
%% @doc Count packets received. Will not count $SYS PUBLISH.
|
||||
-spec sent(mqtt_packet()) -> ok.
|
||||
-spec(sent(mqtt_packet()) -> ok).
|
||||
sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) ->
|
||||
ignore;
|
||||
sent(Packet) ->
|
||||
|
@ -172,7 +172,7 @@ qos_sent(?QOS_2) ->
|
|||
inc('messages/qos2/sent').
|
||||
|
||||
%% @doc Get all metrics
|
||||
-spec all() -> [{atom(), non_neg_integer()}].
|
||||
-spec(all() -> [{atom(), non_neg_integer()}]).
|
||||
all() ->
|
||||
maps:to_list(
|
||||
ets:foldl(
|
||||
|
@ -184,17 +184,17 @@ all() ->
|
|||
end, #{}, ?METRIC_TAB)).
|
||||
|
||||
%% @doc Get metric value
|
||||
-spec value(atom()) -> non_neg_integer().
|
||||
-spec(value(atom()) -> non_neg_integer()).
|
||||
value(Metric) ->
|
||||
lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
|
||||
|
||||
%% @doc Increase counter
|
||||
-spec inc(atom()) -> non_neg_integer().
|
||||
-spec(inc(atom()) -> non_neg_integer()).
|
||||
inc(Metric) ->
|
||||
inc(counter, Metric, 1).
|
||||
|
||||
%% @doc Increase metric value
|
||||
-spec inc({counter | gauge, atom()} | atom(), pos_integer()) -> non_neg_integer().
|
||||
-spec(inc({counter | gauge, atom()} | atom(), pos_integer()) -> non_neg_integer()).
|
||||
inc({gauge, Metric}, Val) ->
|
||||
inc(gauge, Metric, Val);
|
||||
inc({counter, Metric}, Val) ->
|
||||
|
@ -203,19 +203,19 @@ inc(Metric, Val) when is_atom(Metric) ->
|
|||
inc(counter, Metric, Val).
|
||||
|
||||
%% @doc Increase metric value
|
||||
-spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer().
|
||||
-spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()).
|
||||
inc(gauge, Metric, Val) ->
|
||||
ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, Val});
|
||||
inc(counter, Metric, Val) ->
|
||||
ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}).
|
||||
|
||||
%% @doc Decrease metric value
|
||||
-spec dec(gauge, atom()) -> integer().
|
||||
-spec(dec(gauge, atom()) -> integer()).
|
||||
dec(gauge, Metric) ->
|
||||
dec(gauge, Metric, 1).
|
||||
|
||||
%% @doc Decrease metric value
|
||||
-spec dec(gauge, atom(), pos_integer()) -> integer().
|
||||
-spec(dec(gauge, atom(), pos_integer()) -> integer()).
|
||||
dec(gauge, Metric, Val) ->
|
||||
ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}).
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start mnesia database.
|
||||
-spec start() -> ok.
|
||||
-spec(start() -> ok).
|
||||
start() ->
|
||||
ensure_ok(ensure_data_dir()),
|
||||
ensure_ok(init_schema()),
|
||||
|
@ -53,12 +53,12 @@ ensure_data_dir() ->
|
|||
end.
|
||||
|
||||
%% @doc ensure mnesia started.
|
||||
-spec ensure_started() -> ok | {error, any()}.
|
||||
-spec(ensure_started() -> ok | {error, any()}).
|
||||
ensure_started() ->
|
||||
ok = mnesia:start(), wait_for(start).
|
||||
|
||||
%% @doc ensure mnesia stopped.
|
||||
-spec ensure_stopped() -> ok | {error, any()}.
|
||||
-spec(ensure_stopped() -> ok | {error, any()}).
|
||||
ensure_stopped() ->
|
||||
stopped = mnesia:stop(), wait_for(stop).
|
||||
|
||||
|
@ -87,16 +87,16 @@ copy_tables() ->
|
|||
emqttd_boot:apply_module_attributes(copy_mnesia).
|
||||
|
||||
%% @doc Create mnesia table.
|
||||
-spec create_table(Name:: atom(), TabDef :: list()) -> ok | {error, any()}.
|
||||
-spec(create_table(Name:: atom(), TabDef :: list()) -> ok | {error, any()}).
|
||||
create_table(Name, TabDef) ->
|
||||
ensure_tab(mnesia:create_table(Name, TabDef)).
|
||||
|
||||
%% @doc Copy mnesia table.
|
||||
-spec copy_table(Name :: atom()) -> ok.
|
||||
-spec(copy_table(Name :: atom()) -> ok).
|
||||
copy_table(Name) ->
|
||||
copy_table(Name, ram_copies).
|
||||
|
||||
-spec copy_table(Name:: atom(), ram_copies | disc_copies) -> ok.
|
||||
-spec(copy_table(Name:: atom(), ram_copies | disc_copies) -> ok).
|
||||
copy_table(Name, RamOrDisc) ->
|
||||
ensure_tab(mnesia:add_table_copy(Name, node(), RamOrDisc)).
|
||||
|
||||
|
@ -127,7 +127,7 @@ del_schema_copy(Node) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Join the mnesia cluster
|
||||
-spec join_cluster(node()) -> ok.
|
||||
-spec(join_cluster(node()) -> ok).
|
||||
join_cluster(Node) when Node =/= node() ->
|
||||
%% Stop mnesia and delete schema first
|
||||
ensure_ok(ensure_stopped()),
|
||||
|
@ -141,7 +141,7 @@ join_cluster(Node) when Node =/= node() ->
|
|||
ensure_ok(wait_for(tables)).
|
||||
|
||||
%% @doc Cluster status
|
||||
-spec cluster_status() -> list().
|
||||
-spec(cluster_status() -> list()).
|
||||
cluster_status() ->
|
||||
Running = mnesia:system_info(running_db_nodes),
|
||||
Stopped = mnesia:system_info(db_nodes) -- Running,
|
||||
|
@ -149,7 +149,7 @@ cluster_status() ->
|
|||
[{running_nodes, Running}, {stopped_nodes, Stopped}]).
|
||||
|
||||
%% @doc This node try leave the cluster
|
||||
-spec leave_cluster() -> ok | {error, any()}.
|
||||
-spec(leave_cluster() -> ok | {error, any()}).
|
||||
leave_cluster() ->
|
||||
case running_nodes() -- [node()] of
|
||||
[] ->
|
||||
|
@ -166,7 +166,7 @@ leave_cluster() ->
|
|||
end
|
||||
end.
|
||||
|
||||
-spec leave_cluster(node()) -> ok | {error, any()}.
|
||||
-spec(leave_cluster(node()) -> ok | {error, any()}).
|
||||
leave_cluster(Node) when Node =/= node() ->
|
||||
case is_running_db_node(Node) of
|
||||
true ->
|
||||
|
@ -179,7 +179,7 @@ leave_cluster(Node) when Node =/= node() ->
|
|||
end.
|
||||
|
||||
%% @doc Remove node from mnesia cluster.
|
||||
-spec remove_from_cluster(node()) -> ok | {error, any()}.
|
||||
-spec(remove_from_cluster(node()) -> ok | {error, any()}).
|
||||
remove_from_cluster(Node) when Node =/= node() ->
|
||||
case {is_node_in_cluster(Node), is_running_db_node(Node)} of
|
||||
{true, true} ->
|
||||
|
@ -203,7 +203,7 @@ is_running_db_node(Node) ->
|
|||
lists:member(Node, running_nodes()).
|
||||
|
||||
%% @doc Cluster with node.
|
||||
-spec connect(node()) -> ok | {error, any()}.
|
||||
-spec(connect(node()) -> ok | {error, any()}).
|
||||
connect(Node) ->
|
||||
case mnesia:change_config(extra_db_nodes, [Node]) of
|
||||
{ok, [Node]} -> ok;
|
||||
|
@ -212,7 +212,7 @@ connect(Node) ->
|
|||
end.
|
||||
|
||||
%% @doc Running nodes
|
||||
-spec running_nodes() -> list(node()).
|
||||
-spec(running_nodes() -> list(node())).
|
||||
running_nodes() ->
|
||||
mnesia:system_info(running_db_nodes).
|
||||
|
||||
|
@ -229,7 +229,7 @@ ensure_tab({aborted, {already_exists, _Name, _Node}})-> ok;
|
|||
ensure_tab({aborted, Error}) -> Error.
|
||||
|
||||
%% @doc Wait for mnesia to start, stop or tables ready.
|
||||
-spec wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}.
|
||||
-spec(wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}).
|
||||
wait_for(start) ->
|
||||
case mnesia:system_info(is_running) of
|
||||
yes -> ok;
|
||||
|
|
|
@ -88,7 +88,7 @@
|
|||
-export_type([mqueue/0, priority/0, option/0]).
|
||||
|
||||
%% @doc New Queue.
|
||||
-spec new(iolist(), list(mqueue_option()), fun()) -> mqueue().
|
||||
-spec(new(iolist(), list(mqueue_option()), fun()) -> mqueue()).
|
||||
new(Name, Opts, AlarmFun) ->
|
||||
Type = get_value(type, Opts, simple),
|
||||
MaxLen = get_value(max_length, Opts, infinity),
|
||||
|
@ -125,11 +125,11 @@ high_wm(infinity, _Opts) ->
|
|||
high_wm(MaxLen, Opts) ->
|
||||
round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)).
|
||||
|
||||
-spec name(mqueue()) -> iolist().
|
||||
-spec(name(mqueue()) -> iolist()).
|
||||
name(#mqueue{name = Name}) ->
|
||||
Name.
|
||||
|
||||
-spec type(mqueue()) -> atom().
|
||||
-spec(type(mqueue()) -> atom()).
|
||||
type(#mqueue{type = Type}) ->
|
||||
Type.
|
||||
|
||||
|
@ -142,7 +142,7 @@ len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q).
|
|||
max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
|
||||
|
||||
%% @doc Stats of the mqueue
|
||||
-spec stats(mqueue()) -> [stat()].
|
||||
-spec(stats(mqueue()) -> [stat()]).
|
||||
stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
|
||||
[{len, case Type of
|
||||
simple -> Len;
|
||||
|
@ -150,7 +150,7 @@ stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped
|
|||
end} | [{max_len, MaxLen}, {dropped, Dropped}]].
|
||||
|
||||
%% @doc Enqueue a message.
|
||||
-spec in(mqtt_message(), mqueue()) -> mqueue().
|
||||
-spec(in(mqtt_message(), mqueue()) -> mqueue()).
|
||||
in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
|
||||
MQ;
|
||||
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
-export([is_aliving/1, parse_name/1]).
|
||||
|
||||
%% @doc Is node aliving
|
||||
-spec is_aliving(node()) -> boolean().
|
||||
-spec(is_aliving(node()) -> boolean()).
|
||||
is_aliving(Node) ->
|
||||
case net_adm:ping(Node) of
|
||||
pong -> true;
|
||||
|
@ -29,7 +29,7 @@ is_aliving(Node) ->
|
|||
end.
|
||||
|
||||
%% @doc Parse node name
|
||||
-spec parse_name(string()) -> atom().
|
||||
-spec(parse_name(string()) -> atom()).
|
||||
parse_name(Name) when is_list(Name) ->
|
||||
case string:tokens(Name, "@") of
|
||||
[_Node, _Host] -> list_to_atom(Name);
|
||||
|
|
|
@ -27,17 +27,17 @@
|
|||
-export([format/1]).
|
||||
|
||||
%% @doc Protocol name of version
|
||||
-spec protocol_name(mqtt_vsn()) -> binary().
|
||||
-spec(protocol_name(mqtt_vsn()) -> binary()).
|
||||
protocol_name(Ver) when Ver =:= ?MQTT_PROTO_V31; Ver =:= ?MQTT_PROTO_V311 ->
|
||||
proplists:get_value(Ver, ?PROTOCOL_NAMES).
|
||||
|
||||
%% @doc Name of MQTT packet type
|
||||
-spec type_name(mqtt_packet_type()) -> atom().
|
||||
-spec(type_name(mqtt_packet_type()) -> atom()).
|
||||
type_name(Type) when Type > ?RESERVED andalso Type =< ?DISCONNECT ->
|
||||
lists:nth(Type, ?TYPE_NAMES).
|
||||
|
||||
%% @doc Connack Name
|
||||
-spec connack_name(mqtt_connack()) -> atom().
|
||||
-spec(connack_name(mqtt_connack()) -> atom()).
|
||||
connack_name(?CONNACK_ACCEPT) -> 'CONNACK_ACCEPT';
|
||||
connack_name(?CONNACK_PROTO_VER) -> 'CONNACK_PROTO_VER';
|
||||
connack_name(?CONNACK_INVALID_ID) -> 'CONNACK_INVALID_ID';
|
||||
|
@ -46,7 +46,7 @@ connack_name(?CONNACK_CREDENTIALS) -> 'CONNACK_CREDENTIALS';
|
|||
connack_name(?CONNACK_AUTH) -> 'CONNACK_AUTH'.
|
||||
|
||||
%% @doc Format packet
|
||||
-spec format(mqtt_packet()) -> iolist().
|
||||
-spec(format(mqtt_packet()) -> iolist()).
|
||||
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
|
||||
format_header(Header, format_variable(Variable, Payload)).
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
-type parser() :: fun( (binary()) -> any() ).
|
||||
|
||||
%% @doc Initialize a parser
|
||||
-spec new(Opts :: [option()]) -> parser().
|
||||
-spec(new(Opts :: [option()]) -> parser()).
|
||||
new(Opts) ->
|
||||
fun(Bin) -> parse(Bin, {none, limit(Opts)}) end.
|
||||
|
||||
|
@ -40,8 +40,8 @@ limit(Opts) ->
|
|||
proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
|
||||
|
||||
%% @doc Parse MQTT Packet
|
||||
-spec parse(binary(), {none, [option()]} | fun()) ->
|
||||
{ok, mqtt_packet()} | {error, any()} | {more, fun()}.
|
||||
-spec(parse(binary(), {none, [option()]} | fun())
|
||||
-> {ok, mqtt_packet()} | {error, any()} | {more, fun()}).
|
||||
parse(<<>>, {none, Limit}) ->
|
||||
{more, fun(Bin) -> parse(Bin, {none, Limit}) end};
|
||||
parse(<<PacketType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
-export([list/0]).
|
||||
|
||||
%% @doc Load all plugins when the broker started.
|
||||
-spec load() -> list() | {error, any()}.
|
||||
-spec(load() -> list() | {error, any()}).
|
||||
load() ->
|
||||
case env(loaded_file) of
|
||||
{ok, File} ->
|
||||
|
@ -54,7 +54,7 @@ load_plugins(Names, Persistent) ->
|
|||
[load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad].
|
||||
|
||||
%% @doc Unload all plugins before broker stopped.
|
||||
-spec unload() -> list() | {error, any()}.
|
||||
-spec(unload() -> list() | {error, any()}).
|
||||
unload() ->
|
||||
case env(loaded_file) of
|
||||
{ok, File} ->
|
||||
|
@ -68,7 +68,7 @@ stop_plugins(Names) ->
|
|||
[stop_app(App) || App <- Names].
|
||||
|
||||
%% @doc List all available plugins
|
||||
-spec list() -> [mqtt_plugin()].
|
||||
-spec(list() -> [mqtt_plugin()]).
|
||||
list() ->
|
||||
case env(plugins_dir) of
|
||||
{ok, PluginsDir} ->
|
||||
|
@ -102,7 +102,7 @@ plugin(PluginsDir, AppFile0) ->
|
|||
#mqtt_plugin{name = Name, version = Ver, config = AppsEnv1, descr = Descr}.
|
||||
|
||||
%% @doc Load One Plugin
|
||||
-spec load(atom()) -> ok | {error, any()}.
|
||||
-spec(load(atom()) -> ok | {error, any()}).
|
||||
load(PluginName) when is_atom(PluginName) ->
|
||||
case lists:member(PluginName, names(started_app)) of
|
||||
true ->
|
||||
|
@ -162,7 +162,7 @@ find_plugin(Name, Plugins) ->
|
|||
lists:keyfind(Name, 2, Plugins).
|
||||
|
||||
%% @doc UnLoad One Plugin
|
||||
-spec unload(atom()) -> ok | {error, any()}.
|
||||
-spec(unload(atom()) -> ok | {error, any()}).
|
||||
unload(PluginName) when is_atom(PluginName) ->
|
||||
case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of
|
||||
{true, true} ->
|
||||
|
|
|
@ -25,21 +25,21 @@
|
|||
%% Supervisor callbacks
|
||||
-export([init/1]).
|
||||
|
||||
-spec spec(list()) -> supervisor:child_spec().
|
||||
-spec(spec(list()) -> supervisor:child_spec()).
|
||||
spec(Args) ->
|
||||
spec(pool_sup, Args).
|
||||
|
||||
-spec spec(any(), list()) -> supervisor:child_spec().
|
||||
-spec(spec(any(), list()) -> supervisor:child_spec()).
|
||||
spec(ChildId, Args) ->
|
||||
{ChildId, {?MODULE, start_link, Args},
|
||||
transient, infinity, supervisor, [?MODULE]}.
|
||||
|
||||
-spec start_link(atom(), atom(), mfa()) -> {ok, pid()} | {error, any()}.
|
||||
-spec(start_link(atom(), atom(), mfa()) -> {ok, pid()} | {error, any()}).
|
||||
start_link(Pool, Type, MFA) ->
|
||||
Schedulers = erlang:system_info(schedulers),
|
||||
start_link(Pool, Type, Schedulers, MFA).
|
||||
|
||||
-spec start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, any()}.
|
||||
-spec(start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, any()}).
|
||||
start_link(Pool, Type, Size, MFA) ->
|
||||
supervisor:start_link({local, sup_name(Pool)}, ?MODULE, [Pool, Type, Size, MFA]).
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ start_link() ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
-spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}.
|
||||
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}).
|
||||
start_link(Pool, Id) ->
|
||||
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
|
||||
|
||||
|
|
|
@ -96,7 +96,7 @@ session(#proto_state{session = Session}) ->
|
|||
%% CONNECT – Client requests a connection to a Server
|
||||
|
||||
%% A Client can only send the CONNECT Packet once over a Network Connection.
|
||||
-spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}.
|
||||
-spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
|
||||
received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
|
||||
process(Packet, State#proto_state{connected = true});
|
||||
|
||||
|
@ -246,7 +246,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
|
|||
?LOG(error, "PUBLISH ~p error: ~p", [PacketId, Error], State)
|
||||
end.
|
||||
|
||||
-spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
|
||||
-spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
|
||||
send(Msg, State = #proto_state{client_id = ClientId})
|
||||
when is_record(Msg, mqtt_message) ->
|
||||
emqttd:run_hooks('message.delivered', [ClientId], Msg),
|
||||
|
@ -309,7 +309,7 @@ send_willmsg(ClientId, WillMsg) ->
|
|||
start_keepalive(0) -> ignore;
|
||||
|
||||
start_keepalive(Sec) when Sec > 0 ->
|
||||
self() ! {keepalive, start, round(Sec * 0.75)}.
|
||||
self() ! {keepalive, start, Sec}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Validate Packets
|
||||
|
|
|
@ -148,7 +148,7 @@ del_route_(Route = #mqtt_route{topic = Topic}) ->
|
|||
end.
|
||||
|
||||
%% @doc Has Route?
|
||||
-spec has_route(binary()) -> boolean().
|
||||
-spec(has_route(binary()) -> boolean()).
|
||||
has_route(Topic) ->
|
||||
Routes = case mnesia:is_transaction() of
|
||||
true -> mnesia:read(route, Topic);
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
-export([serialize/1]).
|
||||
|
||||
%% @doc Serialise MQTT Packet
|
||||
-spec serialize(mqtt_packet()) -> binary().
|
||||
-spec(serialize(mqtt_packet()) -> binary()).
|
||||
serialize(#mqtt_packet{header = Header = #mqtt_packet_header{type = Type},
|
||||
variable = Variable,
|
||||
payload = Payload}) ->
|
||||
|
|
|
@ -136,12 +136,12 @@
|
|||
"Session(~s): " ++ Format, [State#session.client_id | Args])).
|
||||
|
||||
%% @doc Start a session.
|
||||
-spec start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}.
|
||||
-spec(start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}).
|
||||
start_link(CleanSess, ClientId, ClientPid) ->
|
||||
gen_server2:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
|
||||
|
||||
%% @doc Resume a session.
|
||||
-spec resume(pid(), mqtt_client_id(), pid()) -> ok.
|
||||
-spec(resume(pid(), mqtt_client_id(), pid()) -> ok).
|
||||
resume(SessPid, ClientId, ClientPid) ->
|
||||
gen_server2:cast(SessPid, {resume, ClientId, ClientPid}).
|
||||
|
||||
|
@ -150,7 +150,7 @@ info(SessPid) ->
|
|||
gen_server2:call(SessPid, info).
|
||||
|
||||
%% @doc Destroy a session.
|
||||
-spec destroy(pid(), mqtt_client_id()) -> ok.
|
||||
-spec(destroy(pid(), mqtt_client_id()) -> ok).
|
||||
destroy(SessPid, ClientId) ->
|
||||
gen_server2:cast(SessPid, {destroy, ClientId}).
|
||||
|
||||
|
@ -159,11 +159,11 @@ destroy(SessPid, ClientId) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Subscribe Topics
|
||||
-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok.
|
||||
-spec(subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok).
|
||||
subscribe(SessPid, TopicTable) ->
|
||||
gen_server2:cast(SessPid, {subscribe, TopicTable, fun(_) -> ok end}).
|
||||
|
||||
-spec subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok.
|
||||
-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok).
|
||||
subscribe(SessPid, PacketId, TopicTable) ->
|
||||
From = self(),
|
||||
AckFun = fun(GrantedQos) ->
|
||||
|
@ -172,7 +172,7 @@ subscribe(SessPid, PacketId, TopicTable) ->
|
|||
gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
|
||||
|
||||
%% @doc Publish message
|
||||
-spec publish(pid(), mqtt_message()) -> ok | {error, any()}.
|
||||
-spec(publish(pid(), mqtt_message()) -> ok | {error, any()}).
|
||||
publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
|
||||
%% publish qos0 directly
|
||||
emqttd:publish(Msg);
|
||||
|
@ -186,24 +186,24 @@ publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
|
|||
gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT).
|
||||
|
||||
%% @doc PubAck message
|
||||
-spec puback(pid(), mqtt_packet_id()) -> ok.
|
||||
-spec(puback(pid(), mqtt_packet_id()) -> ok).
|
||||
puback(SessPid, PktId) ->
|
||||
gen_server2:cast(SessPid, {puback, PktId}).
|
||||
|
||||
-spec pubrec(pid(), mqtt_packet_id()) -> ok.
|
||||
-spec(pubrec(pid(), mqtt_packet_id()) -> ok).
|
||||
pubrec(SessPid, PktId) ->
|
||||
gen_server2:cast(SessPid, {pubrec, PktId}).
|
||||
|
||||
-spec pubrel(pid(), mqtt_packet_id()) -> ok.
|
||||
-spec(pubrel(pid(), mqtt_packet_id()) -> ok).
|
||||
pubrel(SessPid, PktId) ->
|
||||
gen_server2:cast(SessPid, {pubrel, PktId}).
|
||||
|
||||
-spec pubcomp(pid(), mqtt_packet_id()) -> ok.
|
||||
-spec(pubcomp(pid(), mqtt_packet_id()) -> ok).
|
||||
pubcomp(SessPid, PktId) ->
|
||||
gen_server2:cast(SessPid, {pubcomp, PktId}).
|
||||
|
||||
%% @doc Unsubscribe Topics
|
||||
-spec unsubscribe(pid(), [binary()]) -> ok.
|
||||
-spec(unsubscribe(pid(), [binary()]) -> ok).
|
||||
unsubscribe(SessPid, Topics) ->
|
||||
gen_server2:cast(SessPid, {unsubscribe, Topics}).
|
||||
|
||||
|
@ -320,7 +320,6 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id =
|
|||
hibernate(Session)
|
||||
end;
|
||||
|
||||
|
||||
handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
|
||||
subscriptions = Subscriptions}) ->
|
||||
|
||||
|
|
|
@ -24,12 +24,12 @@
|
|||
-export([init/1]).
|
||||
|
||||
%% @doc Start session supervisor
|
||||
-spec start_link() -> {ok, pid()}.
|
||||
-spec(start_link() -> {ok, pid()}).
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
%% @doc Start a session
|
||||
-spec start_session(boolean(), binary(), pid()) -> {ok, pid()}.
|
||||
-spec(start_session(boolean(), binary(), pid()) -> {ok, pid()}).
|
||||
start_session(CleanSess, ClientId, ClientPid) ->
|
||||
supervisor:start_child(?MODULE, [CleanSess, ClientId, ClientPid]).
|
||||
|
||||
|
|
|
@ -72,18 +72,18 @@ mnesia(copy) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start a session manager
|
||||
-spec start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}.
|
||||
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}).
|
||||
start_link(Pool, Id) ->
|
||||
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
|
||||
|
||||
%% @doc Start a session
|
||||
-spec start_session(CleanSess :: boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}.
|
||||
-spec(start_session(boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}).
|
||||
start_session(CleanSess, ClientId) ->
|
||||
SM = gproc_pool:pick_worker(?POOL, ClientId),
|
||||
call(SM, {start_session, {CleanSess, ClientId, self()}}).
|
||||
|
||||
%% @doc Lookup a Session
|
||||
-spec lookup_session(binary()) -> mqtt_session() | undefined.
|
||||
-spec(lookup_session(binary()) -> mqtt_session() | undefined).
|
||||
lookup_session(ClientId) ->
|
||||
case mnesia:dirty_read(session, ClientId) of
|
||||
[Session] -> Session;
|
||||
|
@ -91,17 +91,17 @@ lookup_session(ClientId) ->
|
|||
end.
|
||||
|
||||
%% @doc Register a session with info.
|
||||
-spec register_session(CleanSess, ClientId, Info) -> ok when
|
||||
CleanSess :: boolean(),
|
||||
ClientId :: binary(),
|
||||
Info :: [tuple()].
|
||||
-spec(register_session(CleanSess, ClientId, Info) -> ok when
|
||||
CleanSess :: boolean(),
|
||||
ClientId :: binary(),
|
||||
Info :: [tuple()]).
|
||||
register_session(CleanSess, ClientId, Info) ->
|
||||
ets:insert(sesstab(CleanSess), {{ClientId, self()}, Info}).
|
||||
|
||||
%% @doc Unregister a session.
|
||||
-spec unregister_session(CleanSess, ClientId) -> ok when
|
||||
CleanSess :: boolean(),
|
||||
ClientId :: binary().
|
||||
-spec(unregister_session(CleanSess, ClientId) -> ok when
|
||||
CleanSess :: boolean(),
|
||||
ClientId :: binary()).
|
||||
unregister_session(CleanSess, ClientId) ->
|
||||
ets:delete(sesstab(CleanSess), {ClientId, self()}).
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
-record(state, {stats_fun, tick_tref}).
|
||||
|
||||
%% @doc Start a session helper
|
||||
-spec start_link(fun()) -> {ok, pid()} | ignore | {error, any()}.
|
||||
-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}).
|
||||
start_link(StatsFun) ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []).
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Start stats server
|
||||
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
|
||||
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
|
@ -81,21 +81,21 @@ stop() ->
|
|||
gen_server:call(?SERVER, stop).
|
||||
|
||||
%% @doc Generate stats fun
|
||||
-spec statsfun(Stat :: atom()) -> fun().
|
||||
-spec(statsfun(Stat :: atom()) -> fun()).
|
||||
statsfun(Stat) ->
|
||||
fun(Val) -> setstat(Stat, Val) end.
|
||||
|
||||
-spec statsfun(Stat :: atom(), MaxStat :: atom()) -> fun().
|
||||
-spec(statsfun(Stat :: atom(), MaxStat :: atom()) -> fun()).
|
||||
statsfun(Stat, MaxStat) ->
|
||||
fun(Val) -> setstats(Stat, MaxStat, Val) end.
|
||||
|
||||
%% @doc Get broker statistics
|
||||
-spec getstats() -> [{atom(), non_neg_integer()}].
|
||||
-spec(getstats() -> [{atom(), non_neg_integer()}]).
|
||||
getstats() ->
|
||||
lists:sort(ets:tab2list(?STATS_TAB)).
|
||||
|
||||
%% @doc Get stats by name
|
||||
-spec getstat(atom()) -> non_neg_integer() | undefined.
|
||||
-spec(getstat(atom()) -> non_neg_integer() | undefined).
|
||||
getstat(Name) ->
|
||||
case ets:lookup(?STATS_TAB, Name) of
|
||||
[{Name, Val}] -> Val;
|
||||
|
@ -103,12 +103,12 @@ getstat(Name) ->
|
|||
end.
|
||||
|
||||
%% @doc Set broker stats
|
||||
-spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean().
|
||||
-spec(setstat(Stat :: atom(), Val :: pos_integer()) -> boolean()).
|
||||
setstat(Stat, Val) ->
|
||||
ets:update_element(?STATS_TAB, Stat, {2, Val}).
|
||||
|
||||
%% @doc Set stats with max
|
||||
-spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().
|
||||
-spec(setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean()).
|
||||
setstats(Stat, MaxStat, Val) ->
|
||||
gen_server:cast(?MODULE, {setstats, Stat, MaxStat, Val}).
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ start_link() ->
|
|||
start_child(ChildSpec) when is_tuple(ChildSpec) ->
|
||||
supervisor:start_child(?MODULE, ChildSpec).
|
||||
|
||||
-spec start_child(Mod::atom(), Type :: worker | supervisor) -> {ok, pid()}.
|
||||
-spec(start_child(Mod::atom(), Type :: worker | supervisor) -> {ok, pid()}).
|
||||
start_child(Mod, Type) when is_atom(Mod) and is_atom(Type) ->
|
||||
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
|
||||
|
||||
|
|
|
@ -37,8 +37,8 @@
|
|||
lager:warning([{sysmon, true}], "~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
|
||||
|
||||
%% @doc Start system monitor
|
||||
-spec start_link(Opts :: list(tuple())) ->
|
||||
{ok, pid()} | ignore | {error, term()}.
|
||||
-spec(start_link(Opts :: list(tuple())) ->
|
||||
{ok, pid()} | ignore | {error, term()}).
|
||||
start_link(Opts) ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
-define(MAX_TOPIC_LEN, 4096).
|
||||
|
||||
%% @doc Is wildcard topic?
|
||||
-spec wildcard(topic() | words()) -> true | false.
|
||||
-spec(wildcard(topic() | words()) -> true | false).
|
||||
wildcard(Topic) when is_binary(Topic) ->
|
||||
wildcard(words(Topic));
|
||||
wildcard([]) ->
|
||||
|
@ -46,9 +46,9 @@ wildcard([_H|T]) ->
|
|||
wildcard(T).
|
||||
|
||||
%% @doc Match Topic name with filter
|
||||
-spec match(Name, Filter) -> boolean() when
|
||||
Name :: topic() | words(),
|
||||
Filter :: topic() | words().
|
||||
-spec(match(Name, Filter) -> boolean() when
|
||||
Name :: topic() | words(),
|
||||
Filter :: topic() | words()).
|
||||
match(Name, Filter) when is_binary(Name) and is_binary(Filter) ->
|
||||
match(words(Name), words(Filter));
|
||||
match([], []) ->
|
||||
|
@ -71,7 +71,7 @@ match([], [_H|_T2]) ->
|
|||
false.
|
||||
|
||||
%% @doc Validate Topic
|
||||
-spec validate({name | filter, topic()}) -> boolean().
|
||||
-spec(validate({name | filter, topic()}) -> boolean()).
|
||||
validate({_, <<>>}) ->
|
||||
false;
|
||||
validate({_, Topic}) when is_binary(Topic) and (size(Topic) > ?MAX_TOPIC_LEN) ->
|
||||
|
@ -106,7 +106,7 @@ validate3(<<_/utf8, Rest/binary>>) ->
|
|||
validate3(Rest).
|
||||
|
||||
%% @doc Topic to Triples
|
||||
-spec triples(topic()) -> list(triple()).
|
||||
-spec(triples(topic()) -> list(triple())).
|
||||
triples(Topic) when is_binary(Topic) ->
|
||||
triples(words(Topic), root, []).
|
||||
|
||||
|
@ -128,7 +128,7 @@ bin('#') -> <<"#">>;
|
|||
bin(B) when is_binary(B) -> B.
|
||||
|
||||
%% @doc Split Topic Path to Words
|
||||
-spec words(topic()) -> words().
|
||||
-spec(words(topic()) -> words()).
|
||||
words(Topic) when is_binary(Topic) ->
|
||||
[word(W) || W <- binary:split(Topic, <<"/">>, [global])].
|
||||
|
||||
|
@ -138,7 +138,7 @@ word(<<"#">>) -> '#';
|
|||
word(Bin) -> Bin.
|
||||
|
||||
%% @doc Queue is a special topic name that starts with "$queue/"
|
||||
-spec is_queue(topic()) -> boolean().
|
||||
-spec(is_queue(topic()) -> boolean()).
|
||||
is_queue(<<"$queue/", _Queue/binary>>) ->
|
||||
true;
|
||||
is_queue(_) ->
|
||||
|
@ -151,7 +151,7 @@ systop(Name) when is_atom(Name) ->
|
|||
systop(Name) when is_binary(Name) ->
|
||||
list_to_binary(["$SYS/brokers/", atom_to_list(node()), "/", Name]).
|
||||
|
||||
-spec feed_var(binary(), binary(), binary()) -> binary().
|
||||
-spec(feed_var(binary(), binary(), binary()) -> binary()).
|
||||
feed_var(Var, Val, Topic) ->
|
||||
feed_var(Var, Val, words(Topic), []).
|
||||
feed_var(_Var, _Val, [], Acc) ->
|
||||
|
@ -161,7 +161,7 @@ feed_var(Var, Val, [Var|Words], Acc) ->
|
|||
feed_var(Var, Val, [W|Words], Acc) ->
|
||||
feed_var(Var, Val, Words, [W|Acc]).
|
||||
|
||||
-spec join(list(binary())) -> binary().
|
||||
-spec(join(list(binary())) -> binary()).
|
||||
join([]) ->
|
||||
<<>>;
|
||||
join([W]) ->
|
||||
|
|
|
@ -42,12 +42,12 @@
|
|||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec start_link() -> {ok, pid()}.
|
||||
-spec(start_link() -> {ok, pid()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
%% @doc Start to trace client or topic.
|
||||
-spec start_trace(trace_who(), string()) -> ok | {error, any()}.
|
||||
-spec(start_trace(trace_who(), string()) -> ok | {error, any()}).
|
||||
start_trace({client, ClientId}, LogFile) ->
|
||||
start_trace({start_trace, {client, ClientId}, LogFile});
|
||||
|
||||
|
@ -57,14 +57,14 @@ start_trace({topic, Topic}, LogFile) ->
|
|||
start_trace(Req) -> gen_server:call(?MODULE, Req, infinity).
|
||||
|
||||
%% @doc Stop tracing client or topic.
|
||||
-spec stop_trace(trace_who()) -> ok | {error, any()}.
|
||||
-spec(stop_trace(trace_who()) -> ok | {error, any()}).
|
||||
stop_trace({client, ClientId}) ->
|
||||
gen_server:call(?MODULE, {stop_trace, {client, ClientId}});
|
||||
stop_trace({topic, Topic}) ->
|
||||
gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
|
||||
|
||||
%% @doc Lookup all traces.
|
||||
-spec all_traces() -> [{Who :: trace_who(), LogFile :: string()}].
|
||||
-spec(all_traces() -> [{Who :: trace_who(), LogFile :: string()}]).
|
||||
all_traces() -> gen_server:call(?MODULE, all_traces).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Create or Replicate trie tables.
|
||||
-spec mnesia(boot | copy) -> ok.
|
||||
-spec(mnesia(boot | copy) -> ok).
|
||||
mnesia(boot) ->
|
||||
%% Trie Table
|
||||
ok = emqttd_mnesia:create_table(trie, [
|
||||
|
@ -59,7 +59,7 @@ mnesia(copy) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Insert topic to trie
|
||||
-spec insert(Topic :: binary()) -> ok.
|
||||
-spec(insert(Topic :: binary()) -> ok).
|
||||
insert(Topic) when is_binary(Topic) ->
|
||||
case mnesia:read(trie_node, Topic) of
|
||||
[#trie_node{topic=Topic}] ->
|
||||
|
@ -74,18 +74,18 @@ insert(Topic) when is_binary(Topic) ->
|
|||
end.
|
||||
|
||||
%% @doc Find trie nodes that match topic
|
||||
-spec match(Topic :: binary()) -> list(MatchedTopic :: binary()).
|
||||
-spec(match(Topic :: binary()) -> list(MatchedTopic :: binary())).
|
||||
match(Topic) when is_binary(Topic) ->
|
||||
TrieNodes = match_node(root, emqttd_topic:words(Topic)),
|
||||
[Name || #trie_node{topic=Name} <- TrieNodes, Name =/= undefined].
|
||||
|
||||
%% @doc Lookup a Trie Node
|
||||
-spec lookup(NodeId :: binary()) -> [#trie_node{}].
|
||||
-spec(lookup(NodeId :: binary()) -> [#trie_node{}]).
|
||||
lookup(NodeId) ->
|
||||
mnesia:read(trie_node, NodeId).
|
||||
|
||||
%% @doc Delete topic from trie
|
||||
-spec delete(Topic :: binary()) -> ok.
|
||||
-spec(delete(Topic :: binary()) -> ok).
|
||||
delete(Topic) when is_binary(Topic) ->
|
||||
case mnesia:read(trie_node, Topic) of
|
||||
[#trie_node{edge_count=0}] ->
|
||||
|
|
Loading…
Reference in New Issue