Fix the function spec: '{error, any()}' -> '{error, term()}'
This commit is contained in:
parent
d41fd94abb
commit
11a41166d2
|
@ -136,17 +136,17 @@ subscribed(Topic, Subscriber) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()))
|
-spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()))
|
||||||
-> ok | {error, any()}).
|
-> ok | {error, term()}).
|
||||||
hook(Hook, TagFunction, InitArgs) ->
|
hook(Hook, TagFunction, InitArgs) ->
|
||||||
emqttd_hooks:add(Hook, TagFunction, InitArgs).
|
emqttd_hooks:add(Hook, TagFunction, InitArgs).
|
||||||
|
|
||||||
-spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()), integer())
|
-spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()), integer())
|
||||||
-> ok | {error, any()}).
|
-> ok | {error, term()}).
|
||||||
hook(Hook, TagFunction, InitArgs, Priority) ->
|
hook(Hook, TagFunction, InitArgs, Priority) ->
|
||||||
emqttd_hooks:add(Hook, TagFunction, InitArgs, Priority).
|
emqttd_hooks:add(Hook, TagFunction, InitArgs, Priority).
|
||||||
|
|
||||||
-spec(unhook(atom(), function() | {emqttd_hooks:hooktag(), function()})
|
-spec(unhook(atom(), function() | {emqttd_hooks:hooktag(), function()})
|
||||||
-> ok | {error, any()}).
|
-> ok | {error, term()}).
|
||||||
unhook(Hook, TagFunction) ->
|
unhook(Hook, TagFunction) ->
|
||||||
emqttd_hooks:delete(Hook, TagFunction).
|
emqttd_hooks:delete(Hook, TagFunction).
|
||||||
|
|
||||||
|
|
|
@ -43,12 +43,12 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Start access control server.
|
%% @doc Start access control server.
|
||||||
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
|
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
%% @doc Authenticate MQTT Client.
|
%% @doc Authenticate MQTT Client.
|
||||||
-spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {ok, boolean()} | {error, any()}).
|
-spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {ok, boolean()} | {error, term()}).
|
||||||
auth(Client, Password) when is_record(Client, mqtt_client) ->
|
auth(Client, Password) when is_record(Client, mqtt_client) ->
|
||||||
auth(Client, Password, lookup_mods(auth)).
|
auth(Client, Password, lookup_mods(auth)).
|
||||||
auth(_Client, _Password, []) ->
|
auth(_Client, _Password, []) ->
|
||||||
|
@ -88,16 +88,16 @@ reload_acl() ->
|
||||||
[Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)].
|
[Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)].
|
||||||
|
|
||||||
%% @doc Register Authentication or ACL module.
|
%% @doc Register Authentication or ACL module.
|
||||||
-spec(register_mod(auth | acl, atom(), list()) -> ok | {error, any()}).
|
-spec(register_mod(auth | acl, atom(), list()) -> ok | {error, term()}).
|
||||||
register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl->
|
register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl->
|
||||||
register_mod(Type, Mod, Opts, 0).
|
register_mod(Type, Mod, Opts, 0).
|
||||||
|
|
||||||
-spec(register_mod(auth | acl, atom(), list(), non_neg_integer()) -> ok | {error, any()}).
|
-spec(register_mod(auth | acl, atom(), list(), non_neg_integer()) -> ok | {error, term()}).
|
||||||
register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl->
|
register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl->
|
||||||
gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}).
|
gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}).
|
||||||
|
|
||||||
%% @doc Unregister authentication or ACL module
|
%% @doc Unregister authentication or ACL module
|
||||||
-spec(unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, any()}).
|
-spec(unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, not_found | term()}).
|
||||||
unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
|
unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
|
||||||
gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
|
gen_server:call(?SERVER, {unregister_mod, Type, Mod}).
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
PubSub :: pubsub(),
|
PubSub :: pubsub(),
|
||||||
Topic :: binary()}, State :: any()) -> allow | deny | ignore).
|
Topic :: binary()}, State :: any()) -> allow | deny | ignore).
|
||||||
|
|
||||||
-callback(reload_acl(State :: any()) -> ok | {error, any()}).
|
-callback(reload_acl(State :: any()) -> ok | {error, term()}).
|
||||||
|
|
||||||
-callback(description() -> string()).
|
-callback(description() -> string()).
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Start bridge pool supervisor
|
%% @doc Start bridge pool supervisor
|
||||||
-spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}).
|
-spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, term()}).
|
||||||
start_link(Node, Topic, Options) ->
|
start_link(Node, Topic, Options) ->
|
||||||
MFA = {emqttd_bridge, start_link, [Node, Topic, Options]},
|
MFA = {emqttd_bridge, start_link, [Node, Topic, Options]},
|
||||||
emqttd_pool_sup:start_link({bridge, Node, Topic}, random, MFA).
|
emqttd_pool_sup:start_link({bridge, Node, Topic}, random, MFA).
|
||||||
|
|
|
@ -40,11 +40,11 @@ bridges() ->
|
||||||
<- supervisor:which_children(?MODULE)].
|
<- supervisor:which_children(?MODULE)].
|
||||||
|
|
||||||
%% @doc Start a bridge
|
%% @doc Start a bridge
|
||||||
-spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}).
|
-spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, term()}).
|
||||||
start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
|
start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
|
||||||
start_bridge(Node, Topic, []).
|
start_bridge(Node, Topic, []).
|
||||||
|
|
||||||
-spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}).
|
-spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, term()}).
|
||||||
start_bridge(Node, _Topic, _Options) when Node =:= node() ->
|
start_bridge(Node, _Topic, _Options) when Node =:= node() ->
|
||||||
{error, bridge_to_self};
|
{error, bridge_to_self};
|
||||||
start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
|
start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
|
||||||
|
|
|
@ -61,7 +61,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Start emqttd broker
|
%% @doc Start emqttd broker
|
||||||
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
|
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
|
|
@ -248,7 +248,6 @@ subscriptions(["show", ClientId]) ->
|
||||||
Records -> [print(subscription, Subscription) || Subscription <- Records]
|
Records -> [print(subscription, Subscription) || Subscription <- Records]
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
|
||||||
subscriptions(["add", ClientId, Topic, QoS]) ->
|
subscriptions(["add", ClientId, Topic, QoS]) ->
|
||||||
Add = fun(IntQos) ->
|
Add = fun(IntQos) ->
|
||||||
case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of
|
case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of
|
||||||
|
@ -260,22 +259,14 @@ subscriptions(["add", ClientId, Topic, QoS]) ->
|
||||||
end,
|
end,
|
||||||
if_valid_qos(QoS, Add);
|
if_valid_qos(QoS, Add);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
subscriptions(["del", ClientId]) ->
|
|
||||||
Ok = emqttd:subscriber_down(bin(ClientId)),
|
|
||||||
?PRINT("~p~n", [Ok]);
|
|
||||||
|
|
||||||
subscriptions(["del", ClientId, Topic]) ->
|
subscriptions(["del", ClientId, Topic]) ->
|
||||||
Ok = emqttd:unsubscribe(bin(Topic), bin(ClientId)),
|
Ok = emqttd:unsubscribe(bin(Topic), bin(ClientId)),
|
||||||
?PRINT("~p~n", [Ok]);
|
?PRINT("~p~n", [Ok]);
|
||||||
|
|
||||||
|
|
||||||
subscriptions(_) ->
|
subscriptions(_) ->
|
||||||
?USAGE([{"subscriptions list", "List all subscriptions"},
|
?USAGE([{"subscriptions list", "List all subscriptions"},
|
||||||
{"subscriptions show <ClientId>", "Show subscriptions of a client"},
|
{"subscriptions show <ClientId>", "Show subscriptions of a client"},
|
||||||
{"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
|
{"subscriptions add <ClientId> <Topic> <QoS>", "Add a static subscription manually"},
|
||||||
{"subscriptions del <ClientId>", "Delete static subscriptions manually"},
|
|
||||||
{"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
|
{"subscriptions del <ClientId> <Topic>", "Delete a static subscription manually"}]).
|
||||||
|
|
||||||
% if_could_print(Tab, Fun) ->
|
% if_could_print(Tab, Fun) ->
|
||||||
|
|
|
@ -47,7 +47,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Start Client Manager
|
%% @doc Start Client Manager
|
||||||
-spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, any()}).
|
-spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, term()}).
|
||||||
start_link(Pool, Id, StatsFun) ->
|
start_link(Pool, Id, StatsFun) ->
|
||||||
gen_server2:start_link(?MODULE, [Pool, Id, StatsFun], []).
|
gen_server2:start_link(?MODULE, [Pool, Id, StatsFun], []).
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,9 @@
|
||||||
|
|
||||||
-ifdef(use_specs).
|
-ifdef(use_specs).
|
||||||
|
|
||||||
-callback(load(Opts :: any()) -> ok | {error, any()}).
|
-callback(load(Opts :: any()) -> ok | {error, term()}).
|
||||||
|
|
||||||
-callback(unload(State :: any()) -> any()).
|
-callback(unload(State :: term()) -> term()).
|
||||||
|
|
||||||
-else.
|
-else.
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
-export_type([keepalive/0]).
|
-export_type([keepalive/0]).
|
||||||
|
|
||||||
%% @doc Start a keepalive
|
%% @doc Start a keepalive
|
||||||
-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, any()}).
|
-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}).
|
||||||
start(_, 0, _) ->
|
start(_, 0, _) ->
|
||||||
{ok, #keepalive{}};
|
{ok, #keepalive{}};
|
||||||
start(StatFun, TimeoutSec, TimeoutMsg) ->
|
start(StatFun, TimeoutSec, TimeoutMsg) ->
|
||||||
|
@ -43,7 +43,7 @@ start(StatFun, TimeoutSec, TimeoutMsg) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Check keepalive, called when timeout.
|
%% @doc Check keepalive, called when timeout.
|
||||||
-spec(check(keepalive()) -> {ok, keepalive()} | {error, any()}).
|
-spec(check(keepalive()) -> {ok, keepalive()} | {error, term()}).
|
||||||
check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
|
check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
|
||||||
case StatFun() of
|
case StatFun() of
|
||||||
{ok, NewVal} ->
|
{ok, NewVal} ->
|
||||||
|
|
|
@ -96,8 +96,8 @@
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Start metrics server
|
%% @doc Start the metrics server
|
||||||
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
|
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ start_child(ChildSpec) when is_tuple(ChildSpec) ->
|
||||||
start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) ->
|
start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) ->
|
||||||
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
|
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
|
||||||
|
|
||||||
-spec(stop_child(any()) -> ok | {error, any()}).
|
-spec(stop_child(any()) -> ok | {error, term()}).
|
||||||
stop_child(ChildId) ->
|
stop_child(ChildId) ->
|
||||||
case supervisor:terminate_child(?MODULE, ChildId) of
|
case supervisor:terminate_child(?MODULE, ChildId) of
|
||||||
ok -> supervisor:delete_child(?MODULE, ChildId);
|
ok -> supervisor:delete_child(?MODULE, ChildId);
|
||||||
|
|
|
@ -39,7 +39,7 @@ initial_state(MaxSize) ->
|
||||||
|
|
||||||
%% @doc Parse MQTT Packet
|
%% @doc Parse MQTT Packet
|
||||||
-spec(parse(binary(), {none, pos_integer()} | fun())
|
-spec(parse(binary(), {none, pos_integer()} | fun())
|
||||||
-> {ok, mqtt_packet()} | {error, any()} | {more, fun()}).
|
-> {ok, mqtt_packet()} | {error, term()} | {more, fun()}).
|
||||||
parse(<<>>, {none, MaxLen}) ->
|
parse(<<>>, {none, MaxLen}) ->
|
||||||
{more, fun(Bin) -> parse(Bin, {none, MaxLen}) end};
|
{more, fun(Bin) -> parse(Bin, {none, MaxLen}) end};
|
||||||
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
|
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
|
||||||
|
|
|
@ -47,7 +47,7 @@ init_config(CfgFile) ->
|
||||||
end, AppsEnv).
|
end, AppsEnv).
|
||||||
|
|
||||||
%% @doc Load all plugins when the broker started.
|
%% @doc Load all plugins when the broker started.
|
||||||
-spec(load() -> list() | {error, any()}).
|
-spec(load() -> list() | {error, term()}).
|
||||||
load() ->
|
load() ->
|
||||||
case emqttd:env(plugins_loaded_file) of
|
case emqttd:env(plugins_loaded_file) of
|
||||||
{ok, File} ->
|
{ok, File} ->
|
||||||
|
@ -80,7 +80,7 @@ load_plugins(Names, Persistent) ->
|
||||||
[load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad].
|
[load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad].
|
||||||
|
|
||||||
%% @doc Unload all plugins before broker stopped.
|
%% @doc Unload all plugins before broker stopped.
|
||||||
-spec(unload() -> list() | {error, any()}).
|
-spec(unload() -> list() | {error, term()}).
|
||||||
unload() ->
|
unload() ->
|
||||||
case emqttd:env(plugins_loaded_file) of
|
case emqttd:env(plugins_loaded_file) of
|
||||||
{ok, File} ->
|
{ok, File} ->
|
||||||
|
@ -119,7 +119,7 @@ plugin(CfgFile) ->
|
||||||
#mqtt_plugin{name = AppName, version = Ver, descr = Descr}.
|
#mqtt_plugin{name = AppName, version = Ver, descr = Descr}.
|
||||||
|
|
||||||
%% @doc Load a Plugin
|
%% @doc Load a Plugin
|
||||||
-spec(load(atom()) -> ok | {error, any()}).
|
-spec(load(atom()) -> ok | {error, term()}).
|
||||||
load(PluginName) when is_atom(PluginName) ->
|
load(PluginName) when is_atom(PluginName) ->
|
||||||
case lists:member(PluginName, names(started_app)) of
|
case lists:member(PluginName, names(started_app)) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -172,7 +172,7 @@ find_plugin(Name, Plugins) ->
|
||||||
lists:keyfind(Name, 2, Plugins).
|
lists:keyfind(Name, 2, Plugins).
|
||||||
|
|
||||||
%% @doc UnLoad a Plugin
|
%% @doc UnLoad a Plugin
|
||||||
-spec(unload(atom()) -> ok | {error, any()}).
|
-spec(unload(atom()) -> ok | {error, term()}).
|
||||||
unload(PluginName) when is_atom(PluginName) ->
|
unload(PluginName) when is_atom(PluginName) ->
|
||||||
case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of
|
case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of
|
||||||
{true, true} ->
|
{true, true} ->
|
||||||
|
|
|
@ -36,12 +36,12 @@ spec(ChildId, Args) ->
|
||||||
{ChildId, {?MODULE, start_link, Args},
|
{ChildId, {?MODULE, start_link, Args},
|
||||||
transient, infinity, supervisor, [?MODULE]}.
|
transient, infinity, supervisor, [?MODULE]}.
|
||||||
|
|
||||||
-spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, any()}).
|
-spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, term()}).
|
||||||
start_link(Pool, Type, MFA) ->
|
start_link(Pool, Type, MFA) ->
|
||||||
Schedulers = erlang:system_info(schedulers),
|
Schedulers = erlang:system_info(schedulers),
|
||||||
start_link(Pool, Type, Schedulers, MFA).
|
start_link(Pool, Type, Schedulers, MFA).
|
||||||
|
|
||||||
-spec(start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, any()}).
|
-spec(start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, term()}).
|
||||||
start_link(Pool, Type, Size, MFA) ->
|
start_link(Pool, Type, Size, MFA) ->
|
||||||
supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]).
|
supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]).
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ start_link() ->
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}).
|
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
|
||||||
start_link(Pool, Id) ->
|
start_link(Pool, Id) ->
|
||||||
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
|
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@ session(#proto_state{session = Session}) ->
|
||||||
%% CONNECT – Client requests a connection to a Server
|
%% CONNECT – Client requests a connection to a Server
|
||||||
|
|
||||||
%% A Client can only send the CONNECT Packet once over a Network Connection.
|
%% A Client can only send the CONNECT Packet once over a Network Connection.
|
||||||
-spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
|
-spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, term()}).
|
||||||
received(Packet = ?PACKET(?CONNECT),
|
received(Packet = ?PACKET(?CONNECT),
|
||||||
State = #proto_state{connected = false, stats_data = Stats}) ->
|
State = #proto_state{connected = false, stats_data = Stats}) ->
|
||||||
trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats),
|
trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats),
|
||||||
|
|
|
@ -172,7 +172,7 @@
|
||||||
"Session(~s): " ++ Format, [State#state.client_id | Args])).
|
"Session(~s): " ++ Format, [State#state.client_id | Args])).
|
||||||
|
|
||||||
%% @doc Start a Session
|
%% @doc Start a Session
|
||||||
-spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}).
|
-spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, term()}).
|
||||||
start_link(CleanSess, {ClientId, Username}, ClientPid) ->
|
start_link(CleanSess, {ClientId, Username}, ClientPid) ->
|
||||||
gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []).
|
gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []).
|
||||||
|
|
||||||
|
@ -192,7 +192,7 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??...
|
||||||
gen_server2:cast(Session, {subscribe, From, TopicTable, AckFun}).
|
gen_server2:cast(Session, {subscribe, From, TopicTable, AckFun}).
|
||||||
|
|
||||||
%% @doc Publish Message
|
%% @doc Publish Message
|
||||||
-spec(publish(pid(), mqtt_message()) -> ok | {error, any()}).
|
-spec(publish(pid(), mqtt_message()) -> ok | {error, term()}).
|
||||||
publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) ->
|
publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) ->
|
||||||
%% Publish QoS0 Directly
|
%% Publish QoS0 Directly
|
||||||
emqttd_server:publish(Msg), ok;
|
emqttd_server:publish(Msg), ok;
|
||||||
|
@ -582,9 +582,9 @@ handle_info(Info, Session) ->
|
||||||
?UNEXPECTED_INFO(Info, Session).
|
?UNEXPECTED_INFO(Info, Session).
|
||||||
|
|
||||||
terminate(Reason, #state{client_id = ClientId, username = Username}) ->
|
terminate(Reason, #state{client_id = ClientId, username = Username}) ->
|
||||||
emqttd_stats:del_session_stats(ClientId),
|
%% Move to emqttd_sm to avoid race condition
|
||||||
|
%%emqttd_stats:del_session_stats(ClientId),
|
||||||
emqttd_hooks:run('session.terminated', [ClientId, Username, Reason]),
|
emqttd_hooks:run('session.terminated', [ClientId, Username, Reason]),
|
||||||
emqttd_server:subscriber_down(ClientId),
|
|
||||||
emqttd_sm:unregister_session(ClientId).
|
emqttd_sm:unregister_session(ClientId).
|
||||||
|
|
||||||
code_change(_OldVsn, Session, _Extra) ->
|
code_change(_OldVsn, Session, _Extra) ->
|
||||||
|
|
|
@ -76,12 +76,12 @@ mnesia(copy) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Start a session manager
|
%% @doc Start a session manager
|
||||||
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}).
|
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
|
||||||
start_link(Pool, Id) ->
|
start_link(Pool, Id) ->
|
||||||
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
|
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
|
||||||
|
|
||||||
%% @doc Start a session
|
%% @doc Start a session
|
||||||
-spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, any()}).
|
-spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, term()}).
|
||||||
start_session(CleanSess, {ClientId, Username}) ->
|
start_session(CleanSess, {ClientId, Username}) ->
|
||||||
SM = gproc_pool:pick_worker(?POOL, ClientId),
|
SM = gproc_pool:pick_worker(?POOL, ClientId),
|
||||||
call(SM, {start_session, CleanSess, {ClientId, Username}, self()}).
|
call(SM, {start_session, CleanSess, {ClientId, Username}, self()}).
|
||||||
|
@ -107,6 +107,7 @@ unregister_session(ClientId) ->
|
||||||
unregister_session(ClientId, Pid) ->
|
unregister_session(ClientId, Pid) ->
|
||||||
case ets:lookup(mqtt_local_session, ClientId) of
|
case ets:lookup(mqtt_local_session, ClientId) of
|
||||||
[LocalSess = {_, Pid, _, _}] ->
|
[LocalSess = {_, Pid, _, _}] ->
|
||||||
|
emqttd_stats:del_session_stats(ClientId),
|
||||||
ets:delete_object(mqtt_local_session, LocalSess);
|
ets:delete_object(mqtt_local_session, LocalSess);
|
||||||
_ ->
|
_ ->
|
||||||
false
|
false
|
||||||
|
@ -187,7 +188,6 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
||||||
[] ->
|
[] ->
|
||||||
ok;
|
ok;
|
||||||
[Sess = #mqtt_session{sess_pid = DownPid}] ->
|
[Sess = #mqtt_session{sess_pid = DownPid}] ->
|
||||||
emqttd_stats:del_session_stats(ClientId),
|
|
||||||
mnesia:delete_object(mqtt_session, Sess, write);
|
mnesia:delete_object(mqtt_session, Sess, write);
|
||||||
[_Sess] ->
|
[_Sess] ->
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -39,7 +39,7 @@
|
||||||
-define(LOCK, {?MODULE, clean_sessions}).
|
-define(LOCK, {?MODULE, clean_sessions}).
|
||||||
|
|
||||||
%% @doc Start a session helper
|
%% @doc Start a session helper
|
||||||
-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}).
|
-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, term()}).
|
||||||
start_link(StatsFun) ->
|
start_link(StatsFun) ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []).
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,7 @@ start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
%% @doc Start to trace client or topic.
|
%% @doc Start to trace client or topic.
|
||||||
-spec(start_trace(trace_who(), string()) -> ok | {error, any()}).
|
-spec(start_trace(trace_who(), string()) -> ok | {error, term()}).
|
||||||
start_trace({client, ClientId}, LogFile) ->
|
start_trace({client, ClientId}, LogFile) ->
|
||||||
start_trace({start_trace, {client, ClientId}, LogFile});
|
start_trace({start_trace, {client, ClientId}, LogFile});
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ start_trace({topic, Topic}, LogFile) ->
|
||||||
start_trace(Req) -> gen_server:call(?MODULE, Req, infinity).
|
start_trace(Req) -> gen_server:call(?MODULE, Req, infinity).
|
||||||
|
|
||||||
%% @doc Stop tracing client or topic.
|
%% @doc Stop tracing client or topic.
|
||||||
-spec(stop_trace(trace_who()) -> ok | {error, any()}).
|
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
|
||||||
stop_trace({client, ClientId}) ->
|
stop_trace({client, ClientId}) ->
|
||||||
gen_server:call(?MODULE, {stop_trace, {client, ClientId}});
|
gen_server:call(?MODULE, {stop_trace, {client, ClientId}});
|
||||||
stop_trace({topic, Topic}) ->
|
stop_trace({topic, Topic}) ->
|
||||||
|
|
Loading…
Reference in New Issue