emqx/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_placeho...

102 lines
3.8 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_auto_subscribe_placeholder).
-include_lib("emqx/include/emqx_placeholder.hrl").
-include_lib("emqx/include/logger.hrl").
-export([generate/1]).
-export([to_topic_table/3]).
-spec generate(list() | map()) -> list() | map().
generate(Topics) when is_list(Topics) ->
[generate(Topic) || Topic <- Topics];
generate(T = #{topic := Topic}) ->
T#{placeholder => generate(Topic, [])}.
-spec to_topic_table(list(), map(), map()) -> list().
to_topic_table(PHs, ClientInfo, ConnInfo) ->
Fold = fun(
#{
qos := Qos,
rh := RH,
rap := RAP,
nl := NL,
placeholder := PlaceHolder,
topic := RawTopic
},
Acc
) ->
case to_topic(PlaceHolder, ClientInfo, ConnInfo, []) of
{error, Reason} ->
?SLOG(warning, #{
msg => "auto_subscribe_ignored",
topic => RawTopic,
reason => Reason
}),
Acc;
<<>> ->
?SLOG(warning, #{
msg => "auto_subscribe_ignored",
topic => RawTopic,
reason => empty_topic
}),
Acc;
Topic0 ->
{Topic, Opts} = emqx_topic:parse(Topic0),
[{Topic, Opts#{qos => Qos, rh => RH, rap => RAP, nl => NL}} | Acc]
end
end,
lists:foldl(Fold, [], PHs).
%%--------------------------------------------------------------------
%% internal
generate(<<"">>, Result) ->
lists:reverse(Result);
generate(<<?PH_S_CLIENTID, Tail/binary>>, Result) ->
generate(Tail, [clientid | Result]);
generate(<<?PH_S_USERNAME, Tail/binary>>, Result) ->
generate(Tail, [username | Result]);
generate(<<?PH_S_HOST, Tail/binary>>, Result) ->
generate(Tail, [host | Result]);
generate(<<?PH_S_PORT, Tail/binary>>, Result) ->
generate(Tail, [port | Result]);
generate(<<Char:8, Tail/binary>>, []) ->
generate(Tail, [<<Char:8>>]);
generate(<<Char:8, Tail/binary>>, [R | Result]) when is_binary(R) ->
generate(Tail, [<<R/binary, Char:8>> | Result]);
generate(<<Char:8, Tail/binary>>, [R | Result]) when is_atom(R) ->
generate(Tail, [<<Char:8>> | [R | Result]]).
to_topic([], _, _, Res) ->
list_to_binary(lists:reverse(Res));
to_topic([Binary | PTs], C, Co, Res) when is_binary(Binary) ->
to_topic(PTs, C, Co, [Binary | Res]);
to_topic([clientid | PTs], C = #{clientid := ClientID}, Co, Res) ->
to_topic(PTs, C, Co, [ClientID | Res]);
to_topic([username | _], #{username := undefined}, _, _) ->
{error, username_undefined};
to_topic([username | PTs], C = #{username := Username}, Co, Res) ->
to_topic(PTs, C, Co, [Username | Res]);
to_topic([host | PTs], C, Co = #{peername := {Host, _}}, Res) ->
HostBinary = list_to_binary(inet:ntoa(Host)),
to_topic(PTs, C, Co, [HostBinary | Res]);
to_topic([port | PTs], C, Co = #{peername := {_, Port}}, Res) ->
PortBinary = integer_to_binary(Port),
to_topic(PTs, C, Co, [PortBinary | Res]).