emqx/lib-ce/emqx_modules/src/emqx_mod_subscription.erl

100 lines
3.3 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_subscription).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
%% APIs
-export([on_client_connected/3]).
%%--------------------------------------------------------------------
%% Load/Unload Hook
%%--------------------------------------------------------------------
load(Topics) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) ->
OptFun = case ProtoVer of
?MQTT_PROTO_V5 -> fun(X) -> X end;
_ -> fun(#{qos := Qos}) -> #{qos => Qos} end
end,
Fold = fun({Topic, SubOpts}, Acc) ->
case rep(Topic, ClientId, Username) of
{error, Reason} ->
?LOG(warning, "auto subscribe ignored, topic filter:~ts reason:~p~n",
[Topic, Reason]),
Acc;
<<>> ->
?LOG(warning, "auto subscribe ignored, topic filter:~ts"
" reason: topic can't be empty~n",
[Topic]),
Acc;
NewTopic ->
[{NewTopic, OptFun(SubOpts)} | Acc]
end
end,
case lists:foldl(Fold, [], Topics) of
[] -> ok;
TopicFilters ->
self() ! {subscribe, TopicFilters}
end.
unload(_) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
description() ->
"EMQ X Subscription Module".
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
rep(Topic, ClientId, Username) ->
Words = emqx_topic:words(Topic),
rep(Words, ClientId, Username, []).
rep([<<"%c">> | T], ClientId, Username, Acc) ->
rep(T,
ClientId,
Username,
[ClientId | Acc]);
rep([<<"%u">> | _], _, undefined, _) ->
{error, username_undefined};
rep([<<"%u">> | T], ClientId, Username, Acc) ->
rep(T,
ClientId,
Username,
[Username | Acc]);
rep([H | T], ClientId, UserName, Acc) ->
rep(T, ClientId, UserName, [H | Acc]);
rep([], _, _, Acc) ->
emqx_topic:join(lists:reverse(Acc)).