diff --git a/src/emqttd.erl b/src/emqttd.erl index f012fdc23..65739952f 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -136,17 +136,17 @@ subscribed(Topic, Subscriber) -> %%-------------------------------------------------------------------- -spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any())) - -> ok | {error, any()}). + -> ok | {error, term()}). hook(Hook, TagFunction, InitArgs) -> emqttd_hooks:add(Hook, TagFunction, InitArgs). -spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()), integer()) - -> ok | {error, any()}). + -> ok | {error, term()}). hook(Hook, TagFunction, InitArgs, Priority) -> emqttd_hooks:add(Hook, TagFunction, InitArgs, Priority). -spec(unhook(atom(), function() | {emqttd_hooks:hooktag(), function()}) - -> ok | {error, any()}). + -> ok | {error, term()}). unhook(Hook, TagFunction) -> emqttd_hooks:delete(Hook, TagFunction). diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 6cfcc03cf..601fd263f 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -43,12 +43,12 @@ %%-------------------------------------------------------------------- %% @doc Start access control server. --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% @doc Authenticate MQTT Client. --spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {ok, boolean()} | {error, any()}). +-spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {ok, boolean()} | {error, term()}). auth(Client, Password) when is_record(Client, mqtt_client) -> auth(Client, Password, lookup_mods(auth)). auth(_Client, _Password, []) -> @@ -88,16 +88,16 @@ reload_acl() -> [Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)]. %% @doc Register Authentication or ACL module. --spec(register_mod(auth | acl, atom(), list()) -> ok | {error, any()}). +-spec(register_mod(auth | acl, atom(), list()) -> ok | {error, term()}). register_mod(Type, Mod, Opts) when Type =:= auth; Type =:= acl-> register_mod(Type, Mod, Opts, 0). --spec(register_mod(auth | acl, atom(), list(), non_neg_integer()) -> ok | {error, any()}). +-spec(register_mod(auth | acl, atom(), list(), non_neg_integer()) -> ok | {error, term()}). register_mod(Type, Mod, Opts, Seq) when Type =:= auth; Type =:= acl-> gen_server:call(?SERVER, {register_mod, Type, Mod, Opts, Seq}). %% @doc Unregister authentication or ACL module --spec(unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, any()}). +-spec(unregister_mod(Type :: auth | acl, Mod :: atom()) -> ok | {error, not_found | term()}). unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl -> gen_server:call(?SERVER, {unregister_mod, Type, Mod}). diff --git a/src/emqttd_acl_mod.erl b/src/emqttd_acl_mod.erl index 12e949afe..4ed07b369 100644 --- a/src/emqttd_acl_mod.erl +++ b/src/emqttd_acl_mod.erl @@ -32,7 +32,7 @@ PubSub :: pubsub(), Topic :: binary()}, State :: any()) -> allow | deny | ignore). --callback(reload_acl(State :: any()) -> ok | {error, any()}). +-callback(reload_acl(State :: any()) -> ok | {error, term()}). -callback(description() -> string()). diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index d28b2274c..75138332f 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -23,7 +23,7 @@ %%-------------------------------------------------------------------- %% @doc Start bridge pool supervisor --spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}). +-spec(start_link(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, term()}). start_link(Node, Topic, Options) -> MFA = {emqttd_bridge, start_link, [Node, Topic, Options]}, emqttd_pool_sup:start_link({bridge, Node, Topic}, random, MFA). diff --git a/src/emqttd_bridge_sup_sup.erl b/src/emqttd_bridge_sup_sup.erl index 1d47bd003..11679aba8 100644 --- a/src/emqttd_bridge_sup_sup.erl +++ b/src/emqttd_bridge_sup_sup.erl @@ -40,11 +40,11 @@ bridges() -> <- supervisor:which_children(?MODULE)]. %% @doc Start a bridge --spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}). +-spec(start_bridge(atom(), binary()) -> {ok, pid()} | {error, term()}). start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> start_bridge(Node, Topic, []). --spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}). +-spec(start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, term()}). start_bridge(Node, _Topic, _Options) when Node =:= node() -> {error, bridge_to_self}; start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -> diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 9f939a45e..0161720f2 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -61,7 +61,7 @@ %%-------------------------------------------------------------------- %% @doc Start emqttd broker --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index a4029d46f..c8836230f 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -248,7 +248,6 @@ subscriptions(["show", ClientId]) -> Records -> [print(subscription, Subscription) || Subscription <- Records] end; - subscriptions(["add", ClientId, Topic, QoS]) -> Add = fun(IntQos) -> case emqttd:subscribe(bin(Topic), bin(ClientId), [{qos, IntQos}]) of @@ -260,22 +259,14 @@ subscriptions(["add", ClientId, Topic, QoS]) -> end, if_valid_qos(QoS, Add); - - -subscriptions(["del", ClientId]) -> - Ok = emqttd:subscriber_down(bin(ClientId)), - ?PRINT("~p~n", [Ok]); - subscriptions(["del", ClientId, Topic]) -> Ok = emqttd:unsubscribe(bin(Topic), bin(ClientId)), ?PRINT("~p~n", [Ok]); - subscriptions(_) -> ?USAGE([{"subscriptions list", "List all subscriptions"}, {"subscriptions show ", "Show subscriptions of a client"}, {"subscriptions add ", "Add a static subscription manually"}, - {"subscriptions del ", "Delete static subscriptions manually"}, {"subscriptions del ", "Delete a static subscription manually"}]). % if_could_print(Tab, Fun) -> diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 2e57ebe5b..c5f0ff145 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -47,7 +47,7 @@ %%-------------------------------------------------------------------- %% @doc Start Client Manager --spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id, StatsFun) -> gen_server2:start_link(?MODULE, [Pool, Id, StatsFun], []). diff --git a/src/emqttd_gen_mod.erl b/src/emqttd_gen_mod.erl index 824b65f4c..f8d690024 100644 --- a/src/emqttd_gen_mod.erl +++ b/src/emqttd_gen_mod.erl @@ -24,9 +24,9 @@ -ifdef(use_specs). --callback(load(Opts :: any()) -> ok | {error, any()}). +-callback(load(Opts :: any()) -> ok | {error, term()}). --callback(unload(State :: any()) -> any()). +-callback(unload(State :: term()) -> term()). -else. diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index 5d26ea8a6..a0458038a 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -29,7 +29,7 @@ -export_type([keepalive/0]). %% @doc Start a keepalive --spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, any()}). +-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}). start(_, 0, _) -> {ok, #keepalive{}}; start(StatFun, TimeoutSec, TimeoutMsg) -> @@ -43,7 +43,7 @@ start(StatFun, TimeoutSec, TimeoutMsg) -> end. %% @doc Check keepalive, called when timeout. --spec(check(keepalive()) -> {ok, keepalive()} | {error, any()}). +-spec(check(keepalive()) -> {ok, keepalive()} | {error, term()}). check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> case StatFun() of {ok, NewVal} -> diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index c21d274a0..17e6e96d4 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -96,8 +96,8 @@ %% API %%-------------------------------------------------------------------- -%% @doc Start metrics server --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +%% @doc Start the metrics server +-spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). diff --git a/src/emqttd_mod_sup.erl b/src/emqttd_mod_sup.erl index 586d5af1a..749b84a42 100644 --- a/src/emqttd_mod_sup.erl +++ b/src/emqttd_mod_sup.erl @@ -42,7 +42,7 @@ start_child(ChildSpec) when is_tuple(ChildSpec) -> start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) -> supervisor:start_child(?MODULE, ?CHILD(Mod, Type)). --spec(stop_child(any()) -> ok | {error, any()}). +-spec(stop_child(any()) -> ok | {error, term()}). stop_child(ChildId) -> case supervisor:terminate_child(?MODULE, ChildId) of ok -> supervisor:delete_child(?MODULE, ChildId); diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index 27be18a25..4699b3f77 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -39,7 +39,7 @@ initial_state(MaxSize) -> %% @doc Parse MQTT Packet -spec(parse(binary(), {none, pos_integer()} | fun()) - -> {ok, mqtt_packet()} | {error, any()} | {more, fun()}). + -> {ok, mqtt_packet()} | {error, term()} | {more, fun()}). parse(<<>>, {none, MaxLen}) -> {more, fun(Bin) -> parse(Bin, {none, MaxLen}) end}; parse(<>, {none, Limit}) -> diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 73f184a3a..81ff61a4d 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -47,7 +47,7 @@ init_config(CfgFile) -> end, AppsEnv). %% @doc Load all plugins when the broker started. --spec(load() -> list() | {error, any()}). +-spec(load() -> list() | {error, term()}). load() -> case emqttd:env(plugins_loaded_file) of {ok, File} -> @@ -80,7 +80,7 @@ load_plugins(Names, Persistent) -> [load_plugin(find_plugin(Name, Plugins), Persistent) || Name <- NeedToLoad]. %% @doc Unload all plugins before broker stopped. --spec(unload() -> list() | {error, any()}). +-spec(unload() -> list() | {error, term()}). unload() -> case emqttd:env(plugins_loaded_file) of {ok, File} -> @@ -119,7 +119,7 @@ plugin(CfgFile) -> #mqtt_plugin{name = AppName, version = Ver, descr = Descr}. %% @doc Load a Plugin --spec(load(atom()) -> ok | {error, any()}). +-spec(load(atom()) -> ok | {error, term()}). load(PluginName) when is_atom(PluginName) -> case lists:member(PluginName, names(started_app)) of true -> @@ -172,7 +172,7 @@ find_plugin(Name, Plugins) -> lists:keyfind(Name, 2, Plugins). %% @doc UnLoad a Plugin --spec(unload(atom()) -> ok | {error, any()}). +-spec(unload(atom()) -> ok | {error, term()}). unload(PluginName) when is_atom(PluginName) -> case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of {true, true} -> diff --git a/src/emqttd_pool_sup.erl b/src/emqttd_pool_sup.erl index fa1d6aace..87654bcff 100644 --- a/src/emqttd_pool_sup.erl +++ b/src/emqttd_pool_sup.erl @@ -36,12 +36,12 @@ spec(ChildId, Args) -> {ChildId, {?MODULE, start_link, Args}, transient, infinity, supervisor, [?MODULE]}. --spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, any()}). +-spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, MFA) -> Schedulers = erlang:system_info(schedulers), start_link(Pool, Type, Schedulers, MFA). --spec(start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, any()}). +-spec(start_link(atom(), atom(), pos_integer(), mfa()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, Size, MFA) -> supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]). diff --git a/src/emqttd_pooler.erl b/src/emqttd_pooler.erl index 1b73d138c..a74e01fec 100644 --- a/src/emqttd_pooler.erl +++ b/src/emqttd_pooler.erl @@ -40,7 +40,7 @@ start_link() -> %% API %%-------------------------------------------------------------------- --spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id) -> gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 31354dd84..35c914b14 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -124,7 +124,7 @@ session(#proto_state{session = Session}) -> %% CONNECT – Client requests a connection to a Server %% A Client can only send the CONNECT Packet once over a Network Connection. --spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}). +-spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, term()}). received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false, stats_data = Stats}) -> trace(recv, Packet, State), Stats1 = inc_stats(recv, ?CONNECT, Stats), diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 506743d03..aa08d746c 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -172,7 +172,7 @@ "Session(~s): " ++ Format, [State#state.client_id | Args])). %% @doc Start a Session --spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}). +-spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, term()}). start_link(CleanSess, {ClientId, Username}, ClientPid) -> gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []). @@ -192,7 +192,7 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??... gen_server2:cast(Session, {subscribe, From, TopicTable, AckFun}). %% @doc Publish Message --spec(publish(pid(), mqtt_message()) -> ok | {error, any()}). +-spec(publish(pid(), mqtt_message()) -> ok | {error, term()}). publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> %% Publish QoS0 Directly emqttd_server:publish(Msg), ok; @@ -582,9 +582,9 @@ handle_info(Info, Session) -> ?UNEXPECTED_INFO(Info, Session). terminate(Reason, #state{client_id = ClientId, username = Username}) -> - emqttd_stats:del_session_stats(ClientId), + %% Move to emqttd_sm to avoid race condition + %%emqttd_stats:del_session_stats(ClientId), emqttd_hooks:run('session.terminated', [ClientId, Username, Reason]), - emqttd_server:subscriber_down(ClientId), emqttd_sm:unregister_session(ClientId). code_change(_OldVsn, Session, _Extra) -> diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index dacb0b6a1..8d4c220b2 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -76,12 +76,12 @@ mnesia(copy) -> %%-------------------------------------------------------------------- %% @doc Start a session manager --spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id) -> gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). %% @doc Start a session --spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, any()}). +-spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, term()}). start_session(CleanSess, {ClientId, Username}) -> SM = gproc_pool:pick_worker(?POOL, ClientId), call(SM, {start_session, CleanSess, {ClientId, Username}, self()}). @@ -107,6 +107,7 @@ unregister_session(ClientId) -> unregister_session(ClientId, Pid) -> case ets:lookup(mqtt_local_session, ClientId) of [LocalSess = {_, Pid, _, _}] -> + emqttd_stats:del_session_stats(ClientId), ets:delete_object(mqtt_local_session, LocalSess); _ -> false @@ -187,7 +188,6 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> [] -> ok; [Sess = #mqtt_session{sess_pid = DownPid}] -> - emqttd_stats:del_session_stats(ClientId), mnesia:delete_object(mqtt_session, Sess, write); [_Sess] -> ok diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 2e8e9a749..0721339fd 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -39,7 +39,7 @@ -define(LOCK, {?MODULE, clean_sessions}). %% @doc Start a session helper --spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, term()}). start_link(StatsFun) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index eda84bcb1..05734c2d2 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -47,7 +47,7 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). %% @doc Start to trace client or topic. --spec(start_trace(trace_who(), string()) -> ok | {error, any()}). +-spec(start_trace(trace_who(), string()) -> ok | {error, term()}). start_trace({client, ClientId}, LogFile) -> start_trace({start_trace, {client, ClientId}, LogFile}); @@ -57,7 +57,7 @@ start_trace({topic, Topic}, LogFile) -> start_trace(Req) -> gen_server:call(?MODULE, Req, infinity). %% @doc Stop tracing client or topic. --spec(stop_trace(trace_who()) -> ok | {error, any()}). +-spec(stop_trace(trace_who()) -> ok | {error, term()}). stop_trace({client, ClientId}) -> gen_server:call(?MODULE, {stop_trace, {client, ClientId}}); stop_trace({topic, Topic}) ->