diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 5c4b46217..65f535059 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -75,15 +75,19 @@ mnesia(copy) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec(subscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok). subscribe(Group, Topic, SubPid) when is_pid(SubPid) -> gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}). +-spec(unsubscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok). unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. +-spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery()) + -> emqx_types:delivery()). dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery, _FailedSubs = []). @@ -173,12 +177,12 @@ get_ack_ref(Msg) -> -spec(is_ack_required(emqx_types:message()) -> boolean()). is_ack_required(Msg) -> ?no_ack =/= get_ack_ref(Msg). -%% @doc Negative ack dropped message due to message queue being full. +%% @doc Negative ack dropped message due to inflight window or message queue being full. -spec(maybe_nack_dropped(emqx_types:message()) -> ok). maybe_nack_dropped(Msg) -> case get_ack_ref(Msg) of ?no_ack -> ok; - {Sender, Ref} -> nack(Sender, Ref, drpped) + {Sender, Ref} -> nack(Sender, Ref, dropped) end. %% @doc Negative ack message due to connection down. @@ -325,9 +329,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% keep track of alive remote pids maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok; diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 2a37a873d..5339a7879 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -25,12 +25,13 @@ -export([systop/1]). -export([parse/1, parse/2]). +-type(group() :: binary()). -type(topic() :: binary()). -type(word() :: '' | '+' | '#' | binary()). -type(words() :: list(word())). -type(triple() :: {root | binary(), word(), binary()}). --export_type([topic/0, word/0, triple/0]). +-export_type([group/0, topic/0, word/0, triple/0]). -define(MAX_TOPIC_LEN, 4096).