refactor(clusterlink): avoid unnecessary `maybe_` external_broker CB names
This commit is contained in:
parent
d578ac3f9e
commit
d5e82cdfac
|
@ -690,9 +690,9 @@ sync_route(Action, Topic, ReplyTo) ->
|
||||||
Res.
|
Res.
|
||||||
|
|
||||||
external_sync_route(add, Topic) ->
|
external_sync_route(add, Topic) ->
|
||||||
emqx_external_broker:maybe_add_route(Topic);
|
emqx_external_broker:add_route(Topic);
|
||||||
external_sync_route(delete, Topic) ->
|
external_sync_route(delete, Topic) ->
|
||||||
emqx_external_broker:maybe_delete_route(Topic).
|
emqx_external_broker:delete_route(Topic).
|
||||||
|
|
||||||
push_sync_route(Action, Topic, Opts) ->
|
push_sync_route(Action, Topic, Opts) ->
|
||||||
emqx_router_syncer:push(Action, Topic, node(), Opts).
|
emqx_router_syncer:push(Action, Topic, node(), Opts).
|
||||||
|
|
|
@ -21,14 +21,14 @@
|
||||||
|
|
||||||
-callback should_route_to_external_dests(emqx_types:message()) -> boolean().
|
-callback should_route_to_external_dests(emqx_types:message()) -> boolean().
|
||||||
|
|
||||||
-callback maybe_add_route(emqx_types:topic()) -> ok.
|
-callback add_route(emqx_types:topic()) -> ok.
|
||||||
-callback maybe_delete_route(emqx_types:topic()) -> ok.
|
-callback delete_route(emqx_types:topic()) -> ok.
|
||||||
|
|
||||||
-callback maybe_add_shared_route(emqx_types:topic(), emqx_types:group()) -> ok.
|
-callback add_shared_route(emqx_types:topic(), emqx_types:group()) -> ok.
|
||||||
-callback maybe_delete_shared_route(emqx_types:topic(), emqx_types:group()) -> ok.
|
-callback delete_shared_route(emqx_types:topic(), emqx_types:group()) -> ok.
|
||||||
|
|
||||||
-callback maybe_add_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok.
|
-callback add_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok.
|
||||||
-callback maybe_delete_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok.
|
-callback delete_persistent_route(emqx_types:topic(), emqx_persistent_session_ds:id()) -> ok.
|
||||||
|
|
||||||
-callback match_routes(emqx_types:topic()) -> [emqx_types:route()].
|
-callback match_routes(emqx_types:topic()) -> [emqx_types:route()].
|
||||||
|
|
||||||
|
@ -40,12 +40,12 @@
|
||||||
unregister_provider/1,
|
unregister_provider/1,
|
||||||
forward/2,
|
forward/2,
|
||||||
should_route_to_external_dests/1,
|
should_route_to_external_dests/1,
|
||||||
maybe_add_route/1,
|
add_route/1,
|
||||||
maybe_delete_route/1,
|
delete_route/1,
|
||||||
maybe_add_shared_route/2,
|
add_shared_route/2,
|
||||||
maybe_delete_shared_route/2,
|
delete_shared_route/2,
|
||||||
maybe_add_persistent_route/2,
|
add_persistent_route/2,
|
||||||
maybe_delete_persistent_route/2,
|
delete_persistent_route/2,
|
||||||
match_routes/1
|
match_routes/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -117,22 +117,22 @@ forward(ExternalDest, Delivery) ->
|
||||||
should_route_to_external_dests(Message) ->
|
should_route_to_external_dests(Message) ->
|
||||||
?safe_with_provider(?FUNCTION_NAME(Message), false).
|
?safe_with_provider(?FUNCTION_NAME(Message), false).
|
||||||
|
|
||||||
maybe_add_route(Topic) ->
|
add_route(Topic) ->
|
||||||
?safe_with_provider(?FUNCTION_NAME(Topic), ok).
|
?safe_with_provider(?FUNCTION_NAME(Topic), ok).
|
||||||
|
|
||||||
maybe_delete_route(Topic) ->
|
delete_route(Topic) ->
|
||||||
?safe_with_provider(?FUNCTION_NAME(Topic), ok).
|
?safe_with_provider(?FUNCTION_NAME(Topic), ok).
|
||||||
|
|
||||||
maybe_add_shared_route(Topic, Group) ->
|
add_shared_route(Topic, Group) ->
|
||||||
?safe_with_provider(?FUNCTION_NAME(Topic, Group), ok).
|
?safe_with_provider(?FUNCTION_NAME(Topic, Group), ok).
|
||||||
|
|
||||||
maybe_delete_shared_route(Topic, Group) ->
|
delete_shared_route(Topic, Group) ->
|
||||||
?safe_with_provider(?FUNCTION_NAME(Topic, Group), ok).
|
?safe_with_provider(?FUNCTION_NAME(Topic, Group), ok).
|
||||||
|
|
||||||
maybe_add_persistent_route(Topic, ID) ->
|
add_persistent_route(Topic, ID) ->
|
||||||
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
|
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
|
||||||
|
|
||||||
maybe_delete_persistent_route(Topic, ID) ->
|
delete_persistent_route(Topic, ID) ->
|
||||||
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
|
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
|
||||||
|
|
||||||
match_routes(Topic) ->
|
match_routes(Topic) ->
|
||||||
|
|
|
@ -92,7 +92,7 @@ on_subscribe(TopicFilter, SubOpts, #{id := SessionId, s := S0, props := Props})
|
||||||
case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of
|
case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
|
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
|
||||||
_ = emqx_external_broker:maybe_add_persistent_route(TopicFilter, SessionId),
|
_ = emqx_external_broker:add_persistent_route(TopicFilter, SessionId),
|
||||||
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
|
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
|
||||||
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
|
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
|
||||||
SState = #{
|
SState = #{
|
||||||
|
@ -155,7 +155,7 @@ on_unsubscribe(SessionId, TopicFilter, S0) ->
|
||||||
#{session_id => SessionId, topic_filter => TopicFilter},
|
#{session_id => SessionId, topic_filter => TopicFilter},
|
||||||
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId)
|
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId)
|
||||||
),
|
),
|
||||||
_ = emqx_external_broker:maybe_delete_persistent_route(TopicFilter, SessionId),
|
_ = emqx_external_broker:delete_persistent_route(TopicFilter, SessionId),
|
||||||
{ok, emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), Subscription}
|
{ok, emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), Subscription}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -425,7 +425,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_router:do_add_route(Topic, {Group, node()}),
|
ok = emqx_router:do_add_route(Topic, {Group, node()}),
|
||||||
_ = emqx_external_broker:maybe_add_shared_route(Topic, Group),
|
_ = emqx_external_broker:add_shared_route(Topic, Group),
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
ok = maybe_insert_alive_tab(SubPid),
|
ok = maybe_insert_alive_tab(SubPid),
|
||||||
|
@ -550,7 +550,7 @@ is_alive_sub(Pid) ->
|
||||||
delete_route_if_needed({Group, Topic} = GroupTopic) ->
|
delete_route_if_needed({Group, Topic} = GroupTopic) ->
|
||||||
if_no_more_subscribers(GroupTopic, fun() ->
|
if_no_more_subscribers(GroupTopic, fun() ->
|
||||||
ok = emqx_router:do_delete_route(Topic, {Group, node()}),
|
ok = emqx_router:do_delete_route(Topic, {Group, node()}),
|
||||||
_ = emqx_external_broker:maybe_delete_shared_route(Topic, Group),
|
_ = emqx_external_broker:delete_shared_route(Topic, Group),
|
||||||
ok
|
ok
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
|
|
@ -9,12 +9,12 @@
|
||||||
-export([
|
-export([
|
||||||
register_external_broker/0,
|
register_external_broker/0,
|
||||||
unregister_external_broker/0,
|
unregister_external_broker/0,
|
||||||
maybe_add_route/1,
|
add_route/1,
|
||||||
maybe_delete_route/1,
|
delete_route/1,
|
||||||
maybe_add_shared_route/2,
|
add_shared_route/2,
|
||||||
maybe_delete_shared_route/2,
|
delete_shared_route/2,
|
||||||
maybe_add_persistent_route/2,
|
add_persistent_route/2,
|
||||||
maybe_delete_persistent_route/2,
|
delete_persistent_route/2,
|
||||||
match_routes/1,
|
match_routes/1,
|
||||||
forward/2,
|
forward/2,
|
||||||
should_route_to_external_dests/1
|
should_route_to_external_dests/1
|
||||||
|
@ -48,22 +48,22 @@ unregister_external_broker() ->
|
||||||
%% to delete it on a remote cluster.
|
%% to delete it on a remote cluster.
|
||||||
%% There is no need to push Node name as this info can be derived from
|
%% There is no need to push Node name as this info can be derived from
|
||||||
%% agent state on the remote cluster.
|
%% agent state on the remote cluster.
|
||||||
maybe_add_route(Topic) ->
|
add_route(Topic) ->
|
||||||
maybe_push_route_op(add, Topic, Topic).
|
maybe_push_route_op(add, Topic, Topic).
|
||||||
|
|
||||||
maybe_delete_route(Topic) ->
|
delete_route(Topic) ->
|
||||||
maybe_push_route_op(delete, Topic, Topic).
|
maybe_push_route_op(delete, Topic, Topic).
|
||||||
|
|
||||||
maybe_add_shared_route(Topic, Group) ->
|
add_shared_route(Topic, Group) ->
|
||||||
maybe_push_route_op(add, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
|
maybe_push_route_op(add, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
|
||||||
|
|
||||||
maybe_delete_shared_route(Topic, Group) ->
|
delete_shared_route(Topic, Group) ->
|
||||||
maybe_push_route_op(delete, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
|
maybe_push_route_op(delete, Topic, ?SHARED_ROUTE_ID(Topic, Group)).
|
||||||
|
|
||||||
maybe_add_persistent_route(Topic, ID) ->
|
add_persistent_route(Topic, ID) ->
|
||||||
maybe_push_route_op(add, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
|
maybe_push_route_op(add, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
|
||||||
|
|
||||||
maybe_delete_persistent_route(Topic, ID) ->
|
delete_persistent_route(Topic, ID) ->
|
||||||
maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
|
maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
|
||||||
|
|
||||||
forward(DestCluster, Delivery) ->
|
forward(DestCluster, Delivery) ->
|
||||||
|
|
Loading…
Reference in New Issue