diff --git a/src/emqttd_mod_autosub.erl b/src/emqttd_mod_subscription.erl similarity index 56% rename from src/emqttd_mod_autosub.erl rename to src/emqttd_mod_subscription.erl index dff147969..a4d786f26 100644 --- a/src/emqttd_mod_autosub.erl +++ b/src/emqttd_mod_subscription.erl @@ -19,11 +19,11 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc emqttd auto subscribe module. +%%% @doc Subscription from Broker Side %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- --module(emqttd_mod_autosub). +-module(emqttd_mod_subscription). -behaviour(emqttd_gen_mod). @@ -33,29 +33,45 @@ -export([load/1, client_connected/3, unload/1]). --record(state, {topics}). +-record(state, {topics, stored = false}). + +-ifdef(TEST). +-compile(export_all). +-endif. load(Opts) -> - Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], + Topics = [{bin(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)], + State = #state{topics = Topics, stored = lists:member(stored, Opts)}, emqttd_broker:hook('client.connected', {?MODULE, client_connected}, - {?MODULE, client_connected, [Topics]}), - {ok, #state{topics = Topics}}. + {?MODULE, client_connected, [State]}), + {ok, State}. -client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, +client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid, - username = Username}, Topics) -> - F = fun(Topic) -> - Topic1 = emqttd_topic:feed_var(<<"$c">>, ClientId, Topic), - if - Username =:= undefined -> Topic1; - true -> emqttd_topic:feed_var(<<"$u">>, Username, Topic1) - end - end, - emqttd_client:subscribe(ClientPid, [{F(Topic), Qos} || {Topic, Qos} <- Topics]); + username = Username}, + #state{topics = Topics, stored = Stored}) -> + Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end, + TopicTable = with_stored(Stored, ClientId, [{Replace(Topic), Qos} || {Topic, Qos} <- Topics]), + emqttd_client:subscribe(ClientPid, TopicTable). -client_connected(_ConnAck, _Client, _Topics) -> - ignore. +with_stored(false, _ClientId, TopicTable) -> + TopicTable; +with_stored(true, ClientId, TopicTable) -> + Fun = fun(#mqtt_subscription{topic = Topic, qos = Qos}) -> {Topic, Qos} end, + emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_pubsub:lookup(subscription, ClientId)], TopicTable). unload(_Opts) -> emqttd_broker:unhook('client.connected', {?MODULE, client_connected}). +rep(<<"$c">>, ClientId, Topic) -> + emqttd_topic:feed_var(<<"$c">>, ClientId, Topic); +rep(<<"$u">>, undefined, Topic) -> + Topic; +rep(<<"$u">>, Username, Topic) -> + emqttd_topic:feed_var(<<"$u">>, Username, Topic). + +bin(B) when is_binary(B) -> + B; +bin(S) when is_list(S) -> + list_to_binary(S). + diff --git a/test/emqttd_mod_subscription_tests.erl b/test/emqttd_mod_subscription_tests.erl new file mode 100644 index 000000000..4f1cffbef --- /dev/null +++ b/test/emqttd_mod_subscription_tests.erl @@ -0,0 +1,18 @@ +-module(emqttd_mod_subscription_tests). + +-include("emqttd.hrl"). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +-define(M, emqttd_mod_subscription). + +rep_test() -> + ?assertEqual(<<"topic/clientId">>, ?M:rep(<<"$c">>, <<"clientId">>, <<"topic/$c">>)), + ?assertEqual(<<"topic/username">>, ?M:rep(<<"$u">>, <<"username">>, <<"topic/$u">>)), + ?assertEqual(<<"topic/username/clientId">>, + ?M:rep(<<"$c">>, <<"clientId">>, + ?M:rep(<<"$u">>, <<"username">>, <<"topic/$u/$c">>))). + +-endif.