emqx/apps/emqx_resource/src/emqx_resource.erl

814 lines
26 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_resource).
-include("emqx_resource.hrl").
-include("emqx_resource_errors.hrl").
-include_lib("emqx/include/logger.hrl").
%% APIs for resource types
-export([list_types/0]).
%% APIs for instances
-export([
check_config/2,
check_and_create_local/4,
check_and_create_local/5,
check_and_recreate_local/4
]).
%% Sync resource instances and files
%% provisional solution: rpc:multicall to all the nodes for creating/updating/removing
%% todo: replicate operations
-export([
%% store the config and start the instance
create_local/5,
create_dry_run_local/2,
create_dry_run_local/3,
create_dry_run_local/4,
recreate_local/4,
%% remove the config and stop the instance
remove_local/1,
reset_metrics/1,
reset_metrics_local/1,
reset_metrics_local/2,
%% Create metrics for a resource ID
create_metrics/1,
%% Delete metrics for a resource ID
clear_metrics/1
]).
%% Calls to the callback module with current resource state
%% They also save the state after the call finished (except call_get_channel_config/3).
-export([
start/1,
start/2,
restart/1,
restart/2,
%% verify if the resource is working normally
health_check/1,
channel_health_check/2,
get_channels/1,
%% set resource status to disconnected
set_resource_status_connecting/1,
%% stop the instance
stop/1,
%% query the instance
query/2,
query/3,
%% query the instance without batching and queuing messages.
simple_sync_query/2,
%% functions used by connectors to register resources that must be
%% freed when stopping or even when a resource manager crashes.
allocate_resource/3,
has_allocated_resources/1,
get_allocated_resources/1,
get_allocated_resources_list/1,
forget_allocated_resources/1,
deallocate_resource/2,
clean_allocated_resources/2,
%% Get channel config from resource
call_get_channel_config/3,
% Call the format query result function
call_format_query_result/2
]).
%% Direct calls to the callback module
-export([
%% get the callback mode of a specific module
get_callback_mode/1,
get_resource_type/1,
%% start the instance
call_start/3,
%% verify if the resource is working normally
call_health_check/3,
%% verify if the resource channel is working normally
call_channel_health_check/4,
%% stop the instance
call_stop/3,
%% get the query mode of the resource
query_mode/3,
%% Add channel to resource
call_add_channel/5,
%% Remove channel from resource
call_remove_channel/4,
%% Get channels from resource
call_get_channels/2
]).
%% list all the instances, id only.
-export([
list_instances/0,
%% list all the instances
list_instances_verbose/0,
%% return the data of the instance
get_instance/1,
get_metrics/1,
fetch_creation_opts/1,
%% return all the instances of the same resource type
list_instances_by_type/1,
generate_id/1,
list_group_instances/1
]).
-export([apply_reply_fun/2]).
%% common validations
-export([
parse_resource_id/2,
validate_type/1,
validate_name/1
]).
-export([is_dry_run/1]).
-export_type([
query_mode/0,
resource_id/0,
channel_id/0,
resource_data/0,
resource_status/0
]).
-optional_callbacks([
on_query/3,
on_batch_query/3,
on_query_async/4,
on_batch_query_async/4,
on_get_status/2,
on_get_channel_status/3,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
query_mode/1,
on_format_query_result/1
]).
%% when calling emqx_resource:start/1
-callback on_start(resource_id(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
%% when calling emqx_resource:stop/1
-callback on_stop(resource_id(), resource_state()) -> term().
%% when calling emqx_resource:get_callback_mode/1
-callback callback_mode() -> callback_mode().
%% when calling emqx_resource:query/3
-callback on_query(resource_id(), Request :: term(), resource_state()) -> query_result().
%% when calling emqx_resource:on_batch_query/3
-callback on_batch_query(resource_id(), Request :: term(), resource_state()) ->
batch_query_result().
%% when calling emqx_resource:on_query_async/4
-callback on_query_async(
resource_id(),
Request :: term(),
{ReplyFun :: function(), Args :: list()},
resource_state()
) -> query_result().
%% when calling emqx_resource:on_batch_query_async/4
-callback on_batch_query_async(
resource_id(),
Request :: term(),
{ReplyFun :: function(), Args :: list()},
resource_state()
) -> query_result().
%% when calling emqx_resource:health_check/2
-callback on_get_status(resource_id(), resource_state()) ->
health_check_status()
| {health_check_status(), resource_state()}
| {health_check_status(), resource_state(), term()}.
-callback on_get_channel_status(resource_id(), channel_id(), resource_state()) ->
channel_status()
| {channel_status(), Reason :: term()}
| {error, term()}.
-callback query_mode(Config :: term()) -> query_mode().
%% This callback handles the installation of a specified channel.
%%
%% If the channel cannot be successfully installed, the callback shall
%% throw an exception or return an error tuple.
-callback on_add_channel(
ResId :: term(), ResourceState :: term(), ChannelId :: binary(), ChannelConfig :: map()
) -> {ok, term()} | {error, term()}.
%% This callback handles the removal of a specified channel resource.
%%
%% It's guaranteed that the provided channel is installed when this
%% function is invoked. Upon successful deinstallation, the function should return
%% a new state
%%
%% If the channel cannot be successfully deinstalled, the callback should
%% log an error.
%%
-callback on_remove_channel(
ResId :: term(), ResourceState :: term(), ChannelId :: binary()
) -> {ok, NewState :: term()}.
%% This callback shall return a list of channel configs that are currently active
%% for the resource with the given id.
-callback on_get_channels(
ResId :: term()
) -> [term()].
%% When given the result of a on_*query call this function should return a
%% version of the result that is suitable for JSON trace logging. This
%% typically means converting Erlang tuples to maps with appropriate names for
%% the values in the tuple.
-callback on_format_query_result(
QueryResult :: term()
) -> term().
%% Used for tagging log entries.
-callback resource_type() -> atom().
-define(SAFE_CALL(EXPR),
(fun() ->
try
EXPR
catch
throw:Reason ->
{error, Reason};
C:E:S ->
{error, #{
exception => C,
reason => emqx_utils:redact(E),
stacktrace => emqx_utils:redact(S)
}}
end
end)()
).
-spec list_types() -> [module()].
list_types() ->
discover_resource_mods().
-spec discover_resource_mods() -> [module()].
discover_resource_mods() ->
[Mod || {Mod, _} <- code:all_loaded(), is_resource_mod(Mod)].
-spec is_resource_mod(module()) -> boolean().
is_resource_mod(Module) ->
Info = Module:module_info(attributes),
Behaviour =
proplists:get_value(behavior, Info, []) ++
proplists:get_value(behaviour, Info, []),
lists:member(?MODULE, Behaviour).
%% =================================================================================
%% APIs for resource instances
%% =================================================================================
-spec create_local(
resource_id(),
resource_group(),
resource_module(),
resource_config(),
creation_opts()
) ->
{ok, resource_data()}.
create_local(ResId, Group, ResourceType, Config, Opts) ->
emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts).
-spec create_dry_run_local(resource_module(), resource_config()) ->
ok | {error, Reason :: term()}.
create_dry_run_local(ResourceType, Config) ->
emqx_resource_manager:create_dry_run(ResourceType, Config).
create_dry_run_local(ResId, ResourceType, Config) ->
emqx_resource_manager:create_dry_run(ResId, ResourceType, Config).
-spec create_dry_run_local(
resource_id(),
resource_module(),
resource_config(),
OnReadyCallback
) ->
ok | {error, Reason :: term()}
when
OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}).
create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) ->
emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback).
-spec recreate_local(
resource_id(), resource_module(), resource_config(), creation_opts()
) ->
{ok, resource_data()} | {error, Reason :: term()}.
recreate_local(ResId, ResourceType, Config, Opts) ->
emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts).
-spec remove_local(resource_id()) -> ok.
remove_local(ResId) ->
case emqx_resource_manager:remove(ResId) of
ok ->
ok;
{error, not_found} ->
ok;
Error ->
%% Only log, the ResId worker is always removed in manager's remove action.
?SLOG(
warning,
#{
msg => "remove_resource_failed",
error => Error,
resource_id => ResId
},
#{tag => ?TAG}
),
ok
end.
%% Tip: Don't delete reset_metrics_local/1, use before v572 rpc
-spec reset_metrics_local(resource_id()) -> ok.
reset_metrics_local(ResId) ->
reset_metrics_local(ResId, #{}).
-spec reset_metrics_local(resource_id(), map()) -> ok.
reset_metrics_local(ResId, _ClusterOpts) ->
emqx_resource_manager:reset_metrics(ResId).
-spec reset_metrics(resource_id()) -> ok | {error, Reason :: term()}.
reset_metrics(ResId) ->
emqx_resource_proto_v2:reset_metrics(ResId).
%% =================================================================================
-spec query(resource_id(), Request :: term()) -> Result :: term().
query(ResId, Request) ->
query(ResId, Request, #{}).
-spec query(resource_id(), Request :: term(), query_opts()) ->
Result :: term().
query(ResId, Request, Opts) ->
case emqx_resource_manager:get_query_mode_and_last_error(ResId, Opts) of
{error, _} = ErrorTuple ->
ErrorTuple;
{ok, {_, unhealthy_target}} ->
emqx_resource_metrics:matched_inc(ResId),
emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
{ok, {_, {unhealthy_target, Message}}} ->
emqx_resource_metrics:matched_inc(ResId),
emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
?RESOURCE_ERROR(unhealthy_target, Message);
{ok, {simple_async, _}} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{ok, {simple_sync, _}} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
{ok, {simple_async_internal_buffer, _}} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{ok, {simple_sync_internal_buffer, _}} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_internal_buffer_query(
ResId, Request, Opts
);
{ok, {sync, _}} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{ok, {async, _}} ->
emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
end.
-spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
simple_sync_query(ResId, Request) ->
emqx_resource_buffer_worker:simple_sync_query(ResId, Request).
-spec start(resource_id()) -> ok | {error, Reason :: term()}.
start(ResId) ->
start(ResId, #{}).
-spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
start(ResId, Opts) ->
emqx_resource_manager:start(ResId, Opts).
-spec restart(resource_id()) -> ok | {error, Reason :: term()}.
restart(ResId) ->
restart(ResId, #{}).
-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
restart(ResId, Opts) ->
emqx_resource_manager:restart(ResId, Opts).
-spec stop(resource_id()) -> ok | {error, Reason :: term()}.
stop(ResId) ->
emqx_resource_manager:stop(ResId).
-spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}.
health_check(ResId) ->
emqx_resource_manager:health_check(ResId).
-spec channel_health_check(resource_id(), channel_id()) ->
#{status := resource_status(), error := term()}.
channel_health_check(ResId, ChannelId) ->
emqx_resource_manager:channel_health_check(ResId, ChannelId).
-spec get_channels(resource_id()) -> {ok, [{binary(), map()}]} | {error, term()}.
get_channels(ResId) ->
case emqx_resource_manager:lookup_cached(ResId) of
{error, not_found} ->
{error, not_found};
{ok, _Group, _ResourceData = #{mod := Mod}} ->
{ok, emqx_resource:call_get_channels(ResId, Mod)}
end.
set_resource_status_connecting(ResId) ->
emqx_resource_manager:set_resource_status_connecting(ResId).
-spec get_instance(resource_id()) ->
{ok, resource_group(), resource_data()} | {error, Reason :: term()}.
get_instance(ResId) ->
emqx_resource_manager:lookup_cached(ResId).
-spec get_metrics(resource_id()) ->
emqx_metrics_worker:metrics().
get_metrics(ResId) ->
emqx_resource_manager:get_metrics(ResId).
-spec fetch_creation_opts(map()) -> creation_opts().
fetch_creation_opts(Opts) ->
maps:get(resource_opts, Opts, #{}).
-spec list_instances() -> [resource_id()].
list_instances() ->
[Id || #{id := Id} <- list_instances_verbose()].
-spec list_instances_verbose() -> [_ResourceDataWithMetrics :: map()].
list_instances_verbose() ->
[
Res#{metrics => get_metrics(ResId)}
|| #{id := ResId} = Res <- emqx_resource_manager:list_all()
].
-spec list_instances_by_type(module()) -> [resource_id()].
list_instances_by_type(ResourceType) ->
filter_instances(fun
(_, RT) when RT =:= ResourceType -> true;
(_, _) -> false
end).
-spec generate_id(term()) -> resource_id().
generate_id(Name) when is_binary(Name) ->
Id = integer_to_binary(erlang:unique_integer([monotonic, positive])),
<<Name/binary, ":", Id/binary>>.
-spec list_group_instances(resource_group()) -> [resource_id()].
list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
-spec get_callback_mode(module()) -> callback_mode().
get_callback_mode(Mod) ->
Mod:callback_mode().
-spec get_resource_type(module()) -> resource_type().
get_resource_type(Mod) ->
Mod:resource_type().
-spec call_start(resource_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
call_start(ResId, Mod, Config) ->
?SAFE_CALL(
begin
%% If the previous manager process crashed without cleaning up
%% allocated resources, clean them up.
clean_allocated_resources(ResId, Mod),
Mod:on_start(ResId, Config)
end
).
-spec call_health_check(resource_id(), module(), resource_state()) ->
resource_status()
| {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}
| {error, term()}.
call_health_check(ResId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_status(ResId, ResourceState)).
-spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) ->
channel_status()
| {channel_status(), Reason :: term()}
| {error, term()}.
call_channel_health_check(ResId, ChannelId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)).
call_add_channel(ResId, Mod, ResourceState, ChannelId, ChannelConfig) ->
%% Check if on_add_channel is exported
case erlang:function_exported(Mod, on_add_channel, 4) of
true ->
?SAFE_CALL(
Mod:on_add_channel(
ResId, ResourceState, ChannelId, ChannelConfig
)
);
false ->
{error,
<<<<"on_add_channel callback function not available for connector with resource id ">>/binary,
ResId/binary>>}
end.
call_remove_channel(ResId, Mod, ResourceState, ChannelId) ->
%% Check if maybe_install_insert_template is exported
case erlang:function_exported(Mod, on_remove_channel, 3) of
true ->
?SAFE_CALL(
Mod:on_remove_channel(
ResId, ResourceState, ChannelId
)
);
false ->
{error,
<<<<"on_remove_channel callback function not available for connector with resource id ">>/binary,
ResId/binary>>}
end.
call_get_channels(ResId, Mod) ->
case erlang:function_exported(Mod, on_get_channels, 1) of
true ->
Mod:on_get_channels(ResId);
false ->
[]
end.
call_get_channel_config(ResId, ChannelId, Mod) ->
case erlang:function_exported(Mod, on_get_channels, 1) of
true ->
ChConfigs = Mod:on_get_channels(ResId),
case [Conf || {ChId, Conf} <- ChConfigs, ChId =:= ChannelId] of
[ChannelConf] ->
ChannelConf;
_ ->
{error,
<<"Channel ", ChannelId/binary,
"not found. There seems to be a broken reference">>}
end;
false ->
{error,
<<"on_get_channels callback function not available for resource id", ResId/binary>>}
end.
call_format_query_result(Mod, Result) ->
case erlang:function_exported(Mod, on_format_query_result, 1) of
true ->
Mod:on_format_query_result(Result);
false ->
Result
end.
-spec call_stop(resource_id(), module(), resource_state()) -> term().
call_stop(ResId, Mod, ResourceState) ->
?SAFE_CALL(begin
Res = Mod:on_stop(ResId, ResourceState),
case Res of
ok ->
emqx_resource:forget_allocated_resources(ResId);
_ ->
ok
end,
Res
end).
-spec query_mode(module(), term(), creation_opts()) -> query_mode().
query_mode(Mod, Config, Opts) ->
case erlang:function_exported(Mod, query_mode, 1) of
true ->
Mod:query_mode(Config);
false ->
maps:get(query_mode, Opts, sync)
end.
-spec check_config(resource_module(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}.
check_config(ResourceType, Conf) ->
emqx_hocon:check(ResourceType, Conf).
-spec check_and_create_local(
resource_id(),
resource_group(),
resource_module(),
raw_resource_config()
) ->
{ok, resource_data()} | {error, term()}.
check_and_create_local(ResId, Group, ResourceType, RawConfig) ->
check_and_create_local(ResId, Group, ResourceType, RawConfig, #{}).
-spec check_and_create_local(
resource_id(),
resource_group(),
resource_module(),
raw_resource_config(),
creation_opts()
) -> {ok, resource_data()} | {error, term()}.
check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) ->
check_and_do(
ResourceType,
RawConfig,
fun(ResConf) -> create_local(ResId, Group, ResourceType, ResConf, Opts) end
).
-spec check_and_recreate_local(
resource_id(),
resource_module(),
raw_resource_config(),
creation_opts()
) ->
{ok, resource_data()} | {error, term()}.
check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) ->
check_and_do(
ResourceType,
RawConfig,
fun(ResConf) -> recreate_local(ResId, ResourceType, ResConf, Opts) end
).
check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
case check_config(ResourceType, RawConfig) of
{ok, ResConf} -> Do(ResConf);
Error -> Error
end.
apply_reply_fun({F, A}, Result) when is_function(F) ->
_ = erlang:apply(F, A ++ [Result]),
ok;
apply_reply_fun(From, Result) ->
gen_server:reply(From, Result).
-spec allocate_resource(resource_id(), any(), term()) -> ok.
allocate_resource(InstanceId, Key, Value) ->
true = ets:insert(?RESOURCE_ALLOCATION_TAB, {InstanceId, Key, Value}),
ok.
-spec has_allocated_resources(resource_id()) -> boolean().
has_allocated_resources(InstanceId) ->
ets:member(?RESOURCE_ALLOCATION_TAB, InstanceId).
-spec get_allocated_resources(resource_id()) -> map().
get_allocated_resources(InstanceId) ->
Objects = ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId),
maps:from_list([{K, V} || {_InstanceId, K, V} <- Objects]).
-spec get_allocated_resources_list(resource_id()) -> list(tuple()).
get_allocated_resources_list(InstanceId) ->
ets:lookup(?RESOURCE_ALLOCATION_TAB, InstanceId).
-spec forget_allocated_resources(resource_id()) -> ok.
forget_allocated_resources(InstanceId) ->
true = ets:delete(?RESOURCE_ALLOCATION_TAB, InstanceId),
ok.
deallocate_resource(InstanceId, Key) ->
true = ets:match_delete(?RESOURCE_ALLOCATION_TAB, {InstanceId, Key, '_'}),
ok.
-spec create_metrics(resource_id()) -> ok.
create_metrics(ResId) ->
emqx_metrics_worker:create_metrics(
?RES_METRICS,
ResId,
[
'matched',
'retried',
'retried.success',
'retried.failed',
'success',
'late_reply',
'failed',
'dropped',
'dropped.expired',
'dropped.queue_full',
'dropped.resource_not_found',
'dropped.resource_stopped',
'dropped.other',
'received'
],
[matched]
).
-spec clear_metrics(resource_id()) -> ok.
clear_metrics(ResId) ->
emqx_metrics_worker:clear_metrics(?RES_METRICS, ResId).
%% =================================================================================
filter_instances(Filter) ->
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
clean_allocated_resources(ResourceId, ResourceMod) ->
case emqx_resource:has_allocated_resources(ResourceId) of
true ->
%% The resource entries in the ETS table are erased inside
%% `call_stop' if the call is successful.
ok = call_stop(ResourceId, ResourceMod, _ResourceState = undefined),
ok;
false ->
ok
end.
%% @doc Split : separated resource id into type and name.
%% Type must be an existing atom.
%% Name is converted to atom if `atom_name` option is true.
-spec parse_resource_id(list() | binary(), #{atom_name => boolean()}) ->
{atom(), atom() | binary()}.
parse_resource_id(Id0, Opts) ->
Id = bin(Id0),
case string:split(bin(Id), ":", all) of
[Type, Name] ->
{to_type_atom(Type), validate_name(Name, Opts)};
_ ->
invalid_data(
<<"should be of pattern {type}:{name}, but got: ", Id/binary>>
)
end.
to_type_atom(Type) when is_binary(Type) ->
try
erlang:binary_to_existing_atom(Type, utf8)
catch
_:_ ->
throw(#{
kind => validation_error,
reason => <<"unknown resource type: ", Type/binary>>
})
end.
%% @doc Validate if type is valid.
%% Throws and JSON-map error if invalid.
-spec validate_type(binary()) -> ok.
validate_type(Type) ->
_ = to_type_atom(Type),
ok.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
%% @doc Validate if name is valid for bridge.
%% Throws and JSON-map error if invalid.
-spec validate_name(binary()) -> ok.
validate_name(Name) ->
_ = validate_name(Name, #{atom_name => false}),
ok.
-spec is_dry_run(resource_id()) -> boolean().
is_dry_run(ResId) ->
case string:find(ResId, ?TEST_ID_PREFIX) of
nomatch -> false;
TestIdStart -> string:equal(TestIdStart, ResId)
end.
validate_name(<<>>, _Opts) ->
invalid_data("Name cannot be empty string");
validate_name(Name, _Opts) when size(Name) >= 255 ->
invalid_data("Name length must be less than 255");
validate_name(Name, Opts) ->
case re:run(Name, <<"^[0-9a-zA-Z][-0-9a-zA-Z_]*$">>, [{capture, none}]) of
match ->
case maps:get(atom_name, Opts, true) of
%% NOTE
%% Rule may be created before bridge, thus not `list_to_existing_atom/1`,
%% also it is infrequent user input anyway.
true -> binary_to_atom(Name, utf8);
false -> Name
end;
nomatch ->
invalid_data(
<<
"Invalid name format. The name must begin with a letter or number "
"(0-9, a-z, A-Z) and can only include underscores and hyphens as "
"non-initial characters. Got: ",
Name/binary
>>
)
end.
-spec invalid_data(binary()) -> no_return().
invalid_data(Reason) -> throw(#{kind => validation_error, reason => Reason}).