diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 6e3cc046f..a5cc6b335 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -10,6 +10,7 @@ {emqx_bridge,5}. {emqx_bridge,6}. {emqx_broker,1}. +{emqx_cluster_link,1}. {emqx_cm,1}. {emqx_cm,2}. {emqx_cm,3}. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index 324d6dd68..a184ab36d 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -8,10 +8,14 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/http_api.hrl"). -include_lib("emqx_utils/include/emqx_utils_api.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx/include/logger.hrl"). -export([ api_spec/0, paths/0, + namespace/0, + fields/1, schema/1 ]). @@ -23,6 +27,10 @@ -define(CONF_PATH, [cluster, links]). -define(TAGS, [<<"Cluster">>]). +-type cluster_name() :: binary(). + +namespace() -> "cluster_link". + api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). @@ -40,7 +48,7 @@ schema("/cluster/links") -> description => "Get cluster links configuration", tags => ?TAGS, responses => - #{200 => links_config_schema()} + #{200 => links_config_schema_response()} }, post => #{ @@ -49,7 +57,7 @@ schema("/cluster/links") -> 'requestBody' => link_config_schema(), responses => #{ - 200 => link_config_schema(), + 200 => link_config_schema_response(), 400 => emqx_dashboard_swagger:error_codes( [?BAD_REQUEST, ?ALREADY_EXISTS], @@ -68,7 +76,7 @@ schema("/cluster/links/:name") -> parameters => [param_path_name()], responses => #{ - 200 => link_config_schema(), + 200 => link_config_schema_response(), 404 => emqx_dashboard_swagger:error_codes( [?NOT_FOUND], <<"Cluster link not found">> ) @@ -95,7 +103,7 @@ schema("/cluster/links/:name") -> 'requestBody' => update_link_config_schema(), responses => #{ - 200 => link_config_schema(), + 200 => link_config_schema_response(), 404 => emqx_dashboard_swagger:error_codes( [?NOT_FOUND], <<"Cluster link not found">> ), @@ -107,44 +115,30 @@ schema("/cluster/links/:name") -> } }. +fields(link_config_response) -> + [ + {node, hoconsc:mk(binary(), #{desc => ?DESC("node")})}, + {status, hoconsc:mk(status(), #{desc => ?DESC("status")})} + | emqx_cluster_link_schema:fields("link") + ]. + %%-------------------------------------------------------------------- %% API Handler funcs %%-------------------------------------------------------------------- '/cluster/links'(get, _Params) -> - ?OK(get_raw()); + handle_list(); '/cluster/links'(post, #{body := Body = #{<<"name">> := Name}}) -> with_link( Name, return(?BAD_REQUEST('ALREADY_EXISTS', <<"Cluster link already exists">>)), - fun() -> - case emqx_cluster_link_config:create(Body) of - {ok, Res} -> - ?CREATED(Res); - {error, Reason} -> - Message = list_to_binary(io_lib:format("Create link failed ~p", [Reason])), - ?BAD_REQUEST(Message) - end - end + fun() -> handle_create(Name, Body) end ). '/cluster/links/:name'(get, #{bindings := #{name := Name}}) -> - with_link(Name, fun(Link) -> ?OK(Link) end, not_found()); + with_link(Name, fun(Link) -> handle_lookup(Name, Link) end, not_found()); '/cluster/links/:name'(put, #{bindings := #{name := Name}, body := Params0}) -> - with_link( - Name, - fun(Link) -> - Params = Params0#{<<"name">> => Name}, - case emqx_cluster_link_config:update_one_link(Params) of - {ok, Res} -> - ?OK(Res); - {error, Reason} -> - Message = list_to_binary(io_lib:format("Update link failed ~p", [Reason])), - ?BAD_REQUEST(Message) - end - end, - not_found() - ); + with_link(Name, fun() -> handle_update(Name, Params0) end, not_found()); '/cluster/links/:name'(delete, #{bindings := #{name := Name}}) -> with_link( Name, @@ -164,6 +158,48 @@ schema("/cluster/links/:name") -> %% Internal funcs %%-------------------------------------------------------------------- +handle_list() -> + Links = get_raw(), + NodeResults = get_all_link_status_cluster(), + NameToStatus = collect_all_status(NodeResults), + EmptyStatus = #{status => inconsistent, node_status => []}, + Response = + lists:map( + fun(#{<<"name">> := Name} = Link) -> + Status = maps:get(Name, NameToStatus, EmptyStatus), + maps:merge(Link, Status) + end, + Links + ), + ?OK(Response). + +handle_create(Name, Params) -> + case emqx_cluster_link_config:create(Params) of + {ok, Link} -> + ?CREATED(add_status(Name, Link)); + {error, Reason} -> + Message = list_to_binary(io_lib:format("Create link failed ~p", [Reason])), + ?BAD_REQUEST(Message) + end. + +handle_lookup(Name, Link) -> + ?OK(add_status(Name, Link)). + +add_status(Name, Link) -> + NodeResults = get_link_status_cluster(Name), + Status = collect_single_status(NodeResults), + maps:merge(Link, Status). + +handle_update(Name, Params0) -> + Params = Params0#{<<"name">> => Name}, + case emqx_cluster_link_config:update_one_link(Params) of + {ok, Link} -> + ?OK(add_status(Name, Link)); + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update link failed ~p", [Reason])), + ?BAD_REQUEST(Message) + end. + get_raw() -> #{<<"cluster">> := #{<<"links">> := Links}} = emqx_config:fill_defaults( @@ -172,15 +208,130 @@ get_raw() -> ), Links. -links_config_schema() -> - emqx_cluster_link_schema:links_schema( +get_all_link_status_cluster() -> + case emqx_cluster_link_mqtt:get_all_resources_cluster() of + {error, BadResults} -> + ?SLOG(warning, #{ + msg => "cluster_link_api_all_status_bad_erpc_results", + results => BadResults + }), + []; + {ok, NodeResults} -> + NodeResults + end. + +get_link_status_cluster(Name) -> + case emqx_cluster_link_mqtt:get_resource_cluster(Name) of + {error, BadResults} -> + ?SLOG(warning, #{ + msg => "cluster_link_api_lookup_status_bad_erpc_results", + results => BadResults + }), + []; + {ok, NodeResults} -> + NodeResults + end. + +-spec collect_all_status([{node(), #{cluster_name() => _}}]) -> + #{ + cluster_name() => #{ + node := node(), + status := emqx_resource:resource_status() | inconsistent + } + }. +collect_all_status(NodeResults) -> + Reindexed = lists:foldl( + fun({Node, AllLinkData}, Acc) -> + maps:fold( + fun(Name, Data, AccIn) -> + collect_all_status1(Node, Name, Data, AccIn) + end, + Acc, + AllLinkData + ) + end, + #{}, + NodeResults + ), + maps:fold( + fun(Name, NodeToData, Acc) -> + OnlyStatus = [S || #{status := S} <- maps:values(NodeToData)], + SummaryStatus = + case lists:usort(OnlyStatus) of + [SameStatus] -> SameStatus; + _ -> inconsistent + end, + NodeStatus = lists:map( + fun({Node, #{status := S}}) -> + #{node => Node, status => S} + end, + maps:to_list(NodeToData) + ), + Acc#{ + Name => #{ + status => SummaryStatus, + node_status => NodeStatus + } + } + end, + #{}, + Reindexed + ). + +collect_all_status1(Node, Name, Data, Acc) -> + maps:update_with( + Name, + fun(Old) -> Old#{Node => Data} end, + #{Node => Data}, + Acc + ). + +collect_single_status(NodeResults) -> + NodeStatus = + lists:map( + fun + ({Node, {ok, #{status := S}}}) -> + #{node => Node, status => S}; + ({Node, {error, _}}) -> + #{node => Node, status => ?status_disconnected}; + ({Node, _}) -> + #{node => Node, status => inconsistent} + end, + NodeResults + ), + OnlyStatus = [S || #{status := S} <- NodeStatus], + SummaryStatus = + case lists:usort(OnlyStatus) of + [SameStatus] -> SameStatus; + _ -> inconsistent + end, + #{ + status => SummaryStatus, + node_status => NodeStatus + }. + +links_config_schema_response() -> + hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, link_config_response)), #{ + examples => #{<<"example">> => links_config_response_example()} + }). + +link_config_schema() -> + hoconsc:mk(emqx_cluster_link_schema:link_schema(), #{ + examples => #{<<"example">> => hd(links_config_example())} + }). + +link_config_schema_response() -> + hoconsc:mk( + hoconsc:ref(?MODULE, link_config_response), #{ - examples => #{<<"example">> => links_config_example()} + examples => #{ + <<"example">> => hd(links_config_response_example()) + } } ). -link_config_schema() -> - emqx_cluster_link_schema:link_schema(). +status() -> + hoconsc:enum([?status_connected, ?status_disconnected, ?status_connecting, inconsistent]). param_path_name() -> {name, @@ -197,6 +348,22 @@ param_path_name() -> update_link_config_schema() -> proplists:delete(name, emqx_cluster_link_schema:fields("link")). +links_config_response_example() -> + lists:map( + fun(LinkEx) -> + LinkEx#{ + <<"status">> => <<"connected">>, + <<"node_status">> => [ + #{ + <<"node">> => <<"emqx1@emqx.net">>, + <<"status">> => <<"connected">> + } + ] + } + end, + links_config_example() + ). + links_config_example() -> [ #{ @@ -229,7 +396,8 @@ with_link(Name, FoundFn, NotFoundFn) -> case emqx_cluster_link_config:link_raw(Name) of undefined -> NotFoundFn(); - Link = #{} -> + Link0 = #{} -> + Link = fill_defaults_single(Link0), {arity, Arity} = erlang:fun_info(FoundFn, arity), case Arity of 1 -> FoundFn(Link); @@ -237,6 +405,14 @@ with_link(Name, FoundFn, NotFoundFn) -> end end. +fill_defaults_single(Link0) -> + #{<<"cluster">> := #{<<"links">> := [Link]}} = + emqx_config:fill_defaults( + #{<<"cluster">> => #{<<"links">> => [Link0]}}, + #{obfuscate_sensitive_values => true} + ), + Link. + return(Response) -> fun() -> Response end. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 5185803b6..3b37a304e 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -46,6 +46,16 @@ forward/2 ]). +-export([ + get_all_resources_cluster/0, + get_resource_cluster/1 +]). +%% BpAPI / RPC Targets +-export([ + get_resource_local_v1/1, + get_all_resources_local_v1/0 +]). + -define(MSG_CLIENTID_SUFFIX, ":msg:"). -define(MQTT_HOST_OPTS, #{default_port => 1883}). @@ -80,6 +90,8 @@ -define(PUB_TIMEOUT, 10_000). +-type cluster_name() :: binary(). + -spec ensure_msg_fwd_resource(map()) -> {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}. ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) -> @@ -89,10 +101,57 @@ ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) }, emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResOpts1). --spec remove_msg_fwd_resource(binary() | map()) -> ok | {error, Reason :: term()}. +-spec remove_msg_fwd_resource(cluster_name()) -> ok | {error, Reason :: term()}. remove_msg_fwd_resource(ClusterName) -> emqx_resource:remove_local(?MSG_RES_ID(ClusterName)). +-spec get_all_resources_cluster() -> + {ok, [{node(), #{cluster_name() => emqx_resource:resource_data()}}]} + | {error, [term()]}. +get_all_resources_cluster() -> + Nodes = emqx:running_nodes(), + Results = emqx_cluster_link_proto_v1:get_all_resources(Nodes), + sequence_multicall_results(Nodes, Results). + +-spec get_resource_cluster(cluster_name()) -> + {ok, [{node(), {ok, emqx_resource:resource_data()} | {error, not_found}}]} + | {error, [term()]}. +get_resource_cluster(ClusterName) -> + Nodes = emqx:running_nodes(), + Results = emqx_cluster_link_proto_v1:get_resource(Nodes, ClusterName), + sequence_multicall_results(Nodes, Results). + +%% RPC Target in `emqx_cluster_link_proto_v1'. +-spec get_resource_local_v1(cluster_name()) -> + {ok, emqx_resource:resource_data()} | {error, not_found}. +get_resource_local_v1(ClusterName) -> + case emqx_resource:get_instance(?MSG_RES_ID(ClusterName)) of + {ok, _ResourceGroup, ResourceData} -> + {ok, ResourceData}; + {error, not_found} -> + {error, not_found} + end. + +%% RPC Target in `emqx_cluster_link_proto_v1'. +-spec get_all_resources_local_v1() -> #{cluster_name() => emqx_resource:resource_data()}. +get_all_resources_local_v1() -> + lists:foldl( + fun + (?MSG_RES_ID(Name) = Id, Acc) -> + case emqx_resource:get_instance(Id) of + {ok, ?RES_GROUP, ResourceData} -> + Acc#{Name => ResourceData}; + _ -> + Acc + end; + (_Id, Acc) -> + %% Doesn't follow the naming pattern; manually crafted? + Acc + end, + #{}, + emqx_resource:list_group_instances(?RES_GROUP) + ). + %%-------------------------------------------------------------------- %% emqx_resource callbacks (message forwarding) %%-------------------------------------------------------------------- @@ -419,3 +478,16 @@ emqtt_client_opts(ClientIdSuffix, ClusterConf) -> #{clientid := BaseClientId} = Opts = emqx_cluster_link_config:mk_emqtt_options(ClusterConf), ClientId = emqx_bridge_mqtt_lib:clientid_base([BaseClientId, ClientIdSuffix]), Opts#{clientid => ClientId}. + +-spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) -> + {ok, [{node(), term()}]} | {error, [term()]}. +sequence_multicall_results(Nodes, Results) -> + case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of + {OkResults, []} -> + {ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]}; + {_OkResults, BadResults} -> + {error, BadResults} + end. + +is_ok({_Node, {ok, _}}) -> true; +is_ok(_) -> false. diff --git a/apps/emqx_cluster_link/src/proto/emqx_cluster_link_proto_v1.erl b/apps/emqx_cluster_link/src/proto/emqx_cluster_link_proto_v1.erl new file mode 100644 index 000000000..725bb8afc --- /dev/null +++ b/apps/emqx_cluster_link/src/proto/emqx_cluster_link_proto_v1.erl @@ -0,0 +1,31 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + get_resource/2, + get_all_resources/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.7.2". + +-spec get_resource([node()], binary()) -> + emqx_rpc:erpc_multicall({ok, emqx_resource:resource_data()} | {error, not_found}). +get_resource(Nodes, ClusterName) -> + erpc:multicall(Nodes, emqx_cluster_link_mqtt, get_resource_local_v1, [ClusterName], ?TIMEOUT). + +-spec get_all_resources([node()]) -> + emqx_rpc:erpc_multicall(#{binary() => emqx_resource:resource_data()}). +get_all_resources(Nodes) -> + erpc:multicall(Nodes, emqx_cluster_link_mqtt, get_all_resources_local_v1, [], ?TIMEOUT). diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl index 5bb1c377a..5c136925d 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl @@ -37,6 +37,8 @@ "-----END CERTIFICATE-----" >>). +-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -67,10 +69,36 @@ end_per_suite(Config) -> auth_header() -> emqx_mgmt_api_test_util:auth_header_(). +init_per_testcase(t_status = TestCase, Config) -> + ok = emqx_cth_suite:stop_apps([emqx_dashboard]), + SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(TestCase, Config), + TargetClusterSpec = emqx_cluster_link_SUITE:mk_target_cluster(TestCase, Config), + SourceNodes = [SN1 | _] = emqx_cth_cluster:start(SourceClusterSpec), + TargetNodes = emqx_cth_cluster:start(TargetClusterSpec), + emqx_cluster_link_SUITE:start_cluster_link(SourceNodes ++ TargetNodes, Config), + erpc:call(SN1, emqx_cth_suite, start_apps, [ + [emqx_management, emqx_mgmt_api_test_util:emqx_dashboard()], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ]), + [ + {source_nodes, SourceNodes}, + {target_nodes, TargetNodes} + | Config + ]; init_per_testcase(_TC, Config) -> {ok, _} = emqx_cluster_link_config:update([]), Config. +end_per_testcase(t_status, Config) -> + SourceNodes = ?config(source_nodes, Config), + TargetNodes = ?config(target_nodes, Config), + ok = emqx_cth_cluster:stop(SourceNodes), + ok = emqx_cth_cluster:stop(TargetNodes), + _ = emqx_cth_suite:start_apps( + [emqx_mgmt_api_test_util:emqx_dashboard()], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + ok; end_per_testcase(_TC, _Config) -> ok. @@ -158,6 +186,8 @@ t_put_invalid(_Config) -> update_link(Name, maps:remove(<<"server">>, link_params())) ). +%% Tests a sequence of CRUD operations and their expected responses, for common use cases +%% and configuration states. t_crud(_Config) -> %% No links initially. ?assertMatch({200, []}, list()), @@ -167,13 +197,43 @@ t_crud(_Config) -> ?assertMatch({404, _}, update_link(NameA, link_params())), Params1 = link_params(), - ?assertMatch({201, #{<<"name">> := NameA}}, create_link(NameA, Params1)), + ?assertMatch( + {201, #{ + <<"name">> := NameA, + <<"status">> := _, + <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _] + }}, + create_link(NameA, Params1) + ), ?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, create_link(NameA, Params1)), - ?assertMatch({200, [#{<<"name">> := NameA}]}, list()), - ?assertMatch({200, #{<<"name">> := NameA}}, get_link(NameA)), + ?assertMatch( + {200, [ + #{ + <<"name">> := NameA, + <<"status">> := _, + <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _] + } + ]}, + list() + ), + ?assertMatch( + {200, #{ + <<"name">> := NameA, + <<"status">> := _, + <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _] + }}, + get_link(NameA) + ), Params2 = Params1#{<<"pool_size">> := 2}, - ?assertMatch({200, #{<<"name">> := NameA}}, update_link(NameA, Params2)), + ?assertMatch( + {200, #{ + <<"name">> := NameA, + <<"status">> := _, + <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _] + }}, + update_link(NameA, Params2) + ), ?assertMatch({204, _}, delete_link(NameA)), ?assertMatch({404, _}, delete_link(NameA)), @@ -182,3 +242,147 @@ t_crud(_Config) -> ?assertMatch({200, []}, list()), ok. + +%% Verifies the behavior of reported status under different conditions when listing all +%% links and when fetching a specific link. +t_status(Config) -> + [SN1 | _] = ?config(source_nodes, Config), + Name = <<"cl.target">>, + ?assertMatch( + {200, [ + #{ + <<"status">> := <<"connected">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + } + ] + } + ]}, + list() + ), + ?assertMatch( + {200, #{ + <<"status">> := <<"connected">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + } + ] + }}, + get_link(Name) + ), + + %% If one of the nodes reports a different status, the cluster is inconsistent. + ProtoMod = emqx_cluster_link_proto_v1, + ?ON(SN1, begin + ok = meck:new(ProtoMod, [no_link, passthrough, no_history]), + meck:expect(ProtoMod, get_all_resources, fun(Nodes) -> + [Res1, {ok, Res2A} | Rest] = meck:passthrough([Nodes]), + %% Res2A :: #{cluster_name() => emqx_resource:resource_data()} + Res2B = maps:map(fun(_, Data) -> Data#{status := disconnected} end, Res2A), + [Res1, {ok, Res2B} | Rest] + end), + meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) -> + [Res1, {ok, {ok, Res2A}} | Rest] = meck:passthrough([Nodes, LinkName]), + Res2B = Res2A#{status := disconnected}, + [Res1, {ok, {ok, Res2B}} | Rest] + end) + end), + ?assertMatch( + {200, [ + #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"disconnected">> + } + ] + } + ]}, + list() + ), + ?assertMatch( + {200, #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"disconnected">> + } + ] + }}, + get_link(Name) + ), + + %% Simulating erpc failures + ?ON(SN1, begin + meck:expect(ProtoMod, get_all_resources, fun(Nodes) -> + [Res1, _ | Rest] = meck:passthrough([Nodes]), + [Res1, {error, {erpc, noconnection}} | Rest] + end), + meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) -> + [Res1, _ | Rest] = meck:passthrough([Nodes, LinkName]), + [Res1, {error, {erpc, noconnection}} | Rest] + end) + end), + ?assertMatch( + {200, [ + #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [] + } + ]}, + list() + ), + ?assertMatch( + {200, #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [] + }}, + get_link(Name) + ), + %% Simulate another inconsistency + ?ON(SN1, begin + meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) -> + [Res1, _ | Rest] = meck:passthrough([Nodes, LinkName]), + [Res1, {ok, {error, not_found}} | Rest] + end) + end), + ?assertMatch( + {200, #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"disconnected">> + } + ] + }}, + get_link(Name) + ), + + ok.