subscription

This commit is contained in:
Feng 2015-12-17 12:38:13 +08:00
parent e37b00a9e4
commit fe82fde717
2 changed files with 52 additions and 18 deletions

View File

@ -19,11 +19,11 @@
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc emqttd auto subscribe module.
%%% @doc Subscription from Broker Side
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(emqttd_mod_autosub).
-module(emqttd_mod_subscription).
-behaviour(emqttd_gen_mod).
@ -33,29 +33,45 @@
-export([load/1, client_connected/3, unload/1]).
-record(state, {topics}).
-record(state, {topics, stored = false}).
-ifdef(TEST).
-compile(export_all).
-endif.
load(Opts) ->
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
Topics = [{bin(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)],
State = #state{topics = Topics, stored = lists:member(stored, Opts)},
emqttd_broker:hook('client.connected', {?MODULE, client_connected},
{?MODULE, client_connected, [Topics]}),
{ok, #state{topics = Topics}}.
{?MODULE, client_connected, [State]}),
{ok, State}.
client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId,
client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId,
client_pid = ClientPid,
username = Username}, Topics) ->
F = fun(Topic) ->
Topic1 = emqttd_topic:feed_var(<<"$c">>, ClientId, Topic),
if
Username =:= undefined -> Topic1;
true -> emqttd_topic:feed_var(<<"$u">>, Username, Topic1)
end
end,
emqttd_client:subscribe(ClientPid, [{F(Topic), Qos} || {Topic, Qos} <- Topics]);
username = Username},
#state{topics = Topics, stored = Stored}) ->
Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end,
TopicTable = with_stored(Stored, ClientId, [{Replace(Topic), Qos} || {Topic, Qos} <- Topics]),
emqttd_client:subscribe(ClientPid, TopicTable).
client_connected(_ConnAck, _Client, _Topics) ->
ignore.
with_stored(false, _ClientId, TopicTable) ->
TopicTable;
with_stored(true, ClientId, TopicTable) ->
Fun = fun(#mqtt_subscription{topic = Topic, qos = Qos}) -> {Topic, Qos} end,
emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_pubsub:lookup(subscription, ClientId)], TopicTable).
unload(_Opts) ->
emqttd_broker:unhook('client.connected', {?MODULE, client_connected}).
rep(<<"$c">>, ClientId, Topic) ->
emqttd_topic:feed_var(<<"$c">>, ClientId, Topic);
rep(<<"$u">>, undefined, Topic) ->
Topic;
rep(<<"$u">>, Username, Topic) ->
emqttd_topic:feed_var(<<"$u">>, Username, Topic).
bin(B) when is_binary(B) ->
B;
bin(S) when is_list(S) ->
list_to_binary(S).

View File

@ -0,0 +1,18 @@
-module(emqttd_mod_subscription_tests).
-include("emqttd.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-define(M, emqttd_mod_subscription).
rep_test() ->
?assertEqual(<<"topic/clientId">>, ?M:rep(<<"$c">>, <<"clientId">>, <<"topic/$c">>)),
?assertEqual(<<"topic/username">>, ?M:rep(<<"$u">>, <<"username">>, <<"topic/$u">>)),
?assertEqual(<<"topic/username/clientId">>,
?M:rep(<<"$c">>, <<"clientId">>,
?M:rep(<<"$u">>, <<"username">>, <<"topic/$u/$c">>))).
-endif.