diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 3747a4671..ea47c30e8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -173,20 +173,24 @@ lookup(Type, Name) -> Channels = maps:get(added_channels, InstanceData, #{}), BridgeV2Id = id(Type, Name, BridgeConnector), ChannelStatus = maps:get(BridgeV2Id, Channels, undefined), - DisplayBridgeV2Status = + {DisplayBridgeV2Status, ErrorMsg} = case ChannelStatus of - {error, undefined} -> <<"Unknown reason">>; - {error, Reason} -> emqx_utils:readable_error_msg(Reason); - connected -> <<"connected">>; - connecting -> <<"connecting">>; - Error -> emqx_utils:readable_error_msg(Error) + #{status := connected} -> + {connected, <<"">>}; + #{status := Status, error := undefined} -> + {Status, <<"Unknown reason">>}; + #{status := Status, error := Error} -> + {Status, emqx_utils:readable_error_msg(Error)}; + undefined -> + {disconnected, <<"Pending installation">>} end, {ok, #{ type => Type, name => Name, raw_config => RawConf, resource_data => InstanceData, - status => DisplayBridgeV2Status + status => DisplayBridgeV2Status, + error => ErrorMsg }} end. @@ -526,10 +530,10 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> ConnectorId, ChannelTestId ), case HealthCheckResult of - {error, Reason} -> - {error, Reason}; - _ -> - ok + #{status := connected} -> + ok; + #{status := Status, error := Error} -> + {error, {Status, Error}} end end end, @@ -1032,6 +1036,7 @@ lookup_and_transform_to_bridge_v1_helper( BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2), BridgeV1 = maps:remove(status, BridgeV1Tmp), BridgeV2Status = maps:get(status, BridgeV2, undefined), + BridgeV2Error = maps:get(error, BridgeV2, undefined), ResourceData1 = maps:get(resource_data, BridgeV1, #{}), %% Replace id in resouce data BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>, @@ -1040,12 +1045,12 @@ lookup_and_transform_to_bridge_v1_helper( case ConnectorStatus of connected -> case BridgeV2Status of - <<"connected">> -> + connected -> %% No need to modify the status {ok, BridgeV1#{resource_data => ResourceData2}}; NotConnected -> - ResourceData3 = maps:put(status, connecting, ResourceData2), - ResourceData4 = maps:put(error, NotConnected, ResourceData3), + ResourceData3 = maps:put(status, NotConnected, ResourceData2), + ResourceData4 = maps:put(error, BridgeV2Error, ResourceData3), BridgeV1Final = maps:put(resource_data, ResourceData4, BridgeV1), {ok, BridgeV1Final} end; diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 6e15887c8..837425888 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -238,12 +238,12 @@ t_create_dry_run_fail_add_channel(_) -> {error, Msg} end), Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1}, - {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), OnAddChannel2 = wrap_fun(fun() -> throw(Msg) end), Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2}, - {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), ok. t_create_dry_run_fail_get_channel_status(_) -> @@ -252,7 +252,7 @@ t_create_dry_run_fail_get_channel_status(_) -> {error, Msg} end), Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1}, - {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), Fun2 = wrap_fun(fun() -> throw(Msg) end), @@ -280,7 +280,9 @@ t_is_valid_bridge_v1(_) -> t_manual_health_check(_) -> {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), %% Run a health check for the bridge - connected = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := undefined, status := connected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -290,7 +292,9 @@ t_manual_health_check_exception(_) -> }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge - {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := my_error, status := disconnected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -300,7 +304,9 @@ t_manual_health_check_exception_error(_) -> }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge - {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := _, status := disconnected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -310,7 +316,9 @@ t_manual_health_check_error(_) -> }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge - {error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := my_error, status := disconnected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -484,6 +492,83 @@ t_send_message_unhealthy_connector(_) -> ets:delete(ResponseETS), ok. +t_connector_connected_to_connecting_to_connected_no_channel_restart(_) -> + ResponseETS = ets:new(response_ets, [public]), + ets:insert(ResponseETS, {on_start_value, conf}), + ets:insert(ResponseETS, {on_get_status_value, connected}), + OnStartFun = wrap_fun(fun(Conf) -> + case ets:lookup_element(ResponseETS, on_start_value, 2) of + conf -> + {ok, Conf}; + V -> + V + end + end), + OnGetStatusFun = wrap_fun(fun() -> + ets:lookup_element(ResponseETS, on_get_status_value, 2) + end), + OnAddChannelCntr = counters:new(1, []), + OnAddChannelFun = wrap_fun(fun(_InstId, ConnectorState, _ChannelId, _ChannelConfig) -> + counters:add(OnAddChannelCntr, 1, 1), + {ok, ConnectorState} + end), + ConConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"on_start_fun">> => OnStartFun, + <<"on_get_status_fun">> => OnGetStatusFun, + <<"on_add_channel_fun">> => OnAddChannelFun, + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConName = ?FUNCTION_NAME, + {ok, _} = emqx_connector:create(con_type(), ConName, ConConfig), + BridgeConf = (bridge_config())#{ + <<"connector">> => atom_to_binary(ConName) + }, + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf), + %% Wait until on_add_channel_fun is called at least once + wait_until(fun() -> + counters:get(OnAddChannelCntr, 1) =:= 1 + end), + 1 = counters:get(OnAddChannelCntr, 1), + %% We change the status of the connector + ets:insert(ResponseETS, {on_get_status_value, connecting}), + %% Wait until the status is changed + wait_until(fun() -> + {ok, BridgeData} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData) =:= connecting + end), + {ok, BridgeData1} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + ct:pal("Bridge V2 status changed to: ~p", [maps:get(status, BridgeData1)]), + %% We change the status again back to connected + ets:insert(ResponseETS, {on_get_status_value, connected}), + %% Wait until the status is connected again + wait_until(fun() -> + {ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData2) =:= connected + end), + %% On add channel should not have been called again + 1 = counters:get(OnAddChannelCntr, 1), + %% We change the status to an error + ets:insert(ResponseETS, {on_get_status_value, {error, my_error}}), + %% Wait until the status is changed + wait_until(fun() -> + {ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData2) =:= disconnected + end), + %% Now we go back to connected + ets:insert(ResponseETS, {on_get_status_value, connected}), + wait_until(fun() -> + {ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData2) =:= connected + end), + %% Now the channel should have been removed and added again + wait_until(fun() -> + counters:get(OnAddChannelCntr, 1) =:= 2 + end), + ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok = emqx_connector:remove(con_type(), ConName), + ets:delete(ResponseETS), + ok. + t_unhealthy_channel_alarm(_) -> Conf = (bridge_config())#{ <<"on_get_channel_status_fun">> => @@ -720,3 +805,20 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) -> end ), ok. + +%% Helper Functions + +wait_until(Fun) -> + wait_until(Fun, 5000). + +wait_until(Fun, Timeout) when Timeout >= 0 -> + case Fun() of + true -> + ok; + false -> + IdleTime = 100, + timer:sleep(IdleTime), + wait_until(Fun, Timeout - IdleTime) + end; +wait_until(_, _) -> + ct:fail("Wait until event did not happen"). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index a84d6b4b2..0138832a0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -54,6 +54,14 @@ on_add_channel( ) -> Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun(); +on_add_channel( + InstId, + #{on_add_channel_fun := FunRef} = ConnectorState, + ChannelId, + ChannelConfig +) -> + Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), + Fun(InstId, ConnectorState, ChannelId, ChannelConfig); on_add_channel( _InstId, State, @@ -118,8 +126,8 @@ on_get_channel_status( ChannelId, State ) -> - Channels = maps:get(channels, State), - ChannelState = maps:get(ChannelId, Channels), + Channels = maps:get(channels, State, #{}), + ChannelState = maps:get(ChannelId, Channels, #{}), case ChannelState of #{on_get_channel_status_fun := FunRef} -> Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index ef9e6cdf6..9a28ed26a 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -570,7 +570,9 @@ t_nonexistent_topic(_Config) -> erlang:list_to_atom(Type), erlang:list_to_atom(Name), Conf ), % TODO: make sure the user facing APIs for Bridge V1 also get this error - {error, _} = emqx_bridge_v2:health_check(?BRIDGE_TYPE_V2, list_to_atom(Name)), + #{status := disconnected, error := #{error := unknown_kafka_topic}} = emqx_bridge_v2:health_check( + ?BRIDGE_TYPE_V2, list_to_atom(Name) + ), ok = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)), delete_all_bridges(), ok. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index aabb4d46e..37c2e2325 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -119,7 +119,7 @@ t_health_check(_) -> ConnectorConfig = connector_config(), {ok, _} = emqx_connector:create(?TYPE, test_connector3, ConnectorConfig), {ok, _} = emqx_bridge_v2:create(?TYPE, test_bridge_v2, BridgeV2Config), - connected = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), + #{status := connected} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), ok = emqx_bridge_v2:remove(?TYPE, test_bridge_v2), %% Check behaviour when bridge does not exist {error, bridge_not_found} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index a67677478..cb3eb44fe 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -200,6 +200,7 @@ -callback on_get_channel_status(resource_id(), channel_id(), resource_state()) -> channel_status() + | {channel_status(), Reason :: term()} | {error, term()}. -callback query_mode(Config :: term()) -> query_mode(). @@ -457,7 +458,7 @@ health_check(ResId) -> emqx_resource_manager:health_check(ResId). -spec channel_health_check(resource_id(), channel_id()) -> - {ok, resource_status()} | {error, term()}. + #{status := channel_status(), error := term(), any() => any()}. channel_health_check(ResId, ChannelId) -> emqx_resource_manager:channel_health_check(ResId, ChannelId). @@ -534,6 +535,7 @@ call_health_check(ResId, Mod, ResourceState) -> -spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) -> channel_status() + | {channel_status(), Reason :: term()} | {error, term()}. call_channel_health_check(ResId, ChannelId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 13d9ad2de..060a97f83 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1112,11 +1112,21 @@ is_channel_id(Id) -> %% There is no need to query the conncector if the channel is not %% installed as the query will fail anyway. pre_query_channel_check({Id, _} = _Request, Channels) when - is_map_key(Id, Channels), - (map_get(Id, Channels) =:= connected orelse map_get(Id, Channels) =:= connecting) + is_map_key(Id, Channels) -> - ok; + ChannelStatus = maps:get(Id, Channels), + case emqx_resource_manager:channel_status_is_channel_added(ChannelStatus) of + true -> + ok; + false -> + maybe_throw_channel_not_installed(Id) + end; pre_query_channel_check({Id, _} = _Request, _Channels) -> + maybe_throw_channel_not_installed(Id); +pre_query_channel_check(_Request, _Channels) -> + ok. + +maybe_throw_channel_not_installed(Id) -> %% Fail with a recoverable error if the channel is not installed %% so that the operation can be retried. It is emqx_resource_manager's %% responsibility to ensure that the channel installation is retried. @@ -1128,9 +1138,7 @@ pre_query_channel_check({Id, _} = _Request, _Channels) -> ); false -> ok - end; -pre_query_channel_check(_Request, _Channels) -> - ok. + end. do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 4b1c9e4a4..04d40d581 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -44,7 +44,8 @@ list_group/1, lookup_cached/1, get_metrics/1, - reset_metrics/1 + reset_metrics/1, + channel_status_is_channel_added/1 ]). -export([ @@ -306,7 +307,7 @@ health_check(ResId) -> safe_call(ResId, health_check, ?T_OPERATION). -spec channel_health_check(resource_id(), channel_id()) -> - {ok, resource_status()} | {error, term()}. + #{status := channel_status(), error := term(), any() => any()}. channel_health_check(ResId, ChannelId) -> %% Do normal health check first to trigger health checks for channels %% and update the cached health status for the channels @@ -314,7 +315,10 @@ channel_health_check(ResId, ChannelId) -> safe_call(ResId, {channel_health_check, ChannelId}, ?T_OPERATION). add_channel(ResId, ChannelId, Config) -> - safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION). + Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION), + %% Wait for health_check to finish + _ = health_check(ResId), + Result. remove_channel(ResId, ChannelId) -> safe_call(ResId, {remove_channel, ChannelId}, ?T_OPERATION). @@ -420,6 +424,10 @@ handle_event(internal, start_resource, connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> handle_connecting_health_check(Data); +handle_event( + {call, From}, {remove_channel, ChannelId}, connecting = _State, Data +) -> + handle_remove_channel(From, ChannelId, Data); %% State: CONNECTED %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks @@ -459,7 +467,7 @@ handle_event( handle_event( {call, From}, {remove_channel, ChannelId}, _State, Data ) -> - handle_not_connected_remove_channel(From, ChannelId, Data); + handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data); handle_event( {call, From}, get_channels, _State, Data ) -> @@ -570,7 +578,7 @@ add_channels(Data) -> Channels = Data#data.added_channels, NewChannels = lists:foldl( fun({ChannelID, _Conf}, Acc) -> - maps:put(ChannelID, {error, connecting}, Acc) + maps:put(ChannelID, channel_status(), Acc) end, Channels, ChannelIDConfigTuples @@ -589,7 +597,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> AddedChannelsMap = Data#data.added_channels, %% Set the channel status to connecting to indicate that %% we have not yet performed the initial health_check - NewAddedChannelsMap = maps:put(ChannelID, connecting, AddedChannelsMap), + NewAddedChannelsMap = maps:put( + ChannelID, + channel_status_new_waiting_for_health_check(), + AddedChannelsMap + ), NewData = Data#data{ state = NewState, added_channels = NewAddedChannelsMap @@ -603,7 +615,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> reason => Reason }), AddedChannelsMap = Data#data.added_channels, - NewAddedChannelsMap = maps:put(ChannelID, Error, AddedChannelsMap), + NewAddedChannelsMap = maps:put( + ChannelID, + channel_status(Error), + AddedChannelsMap + ), NewData = Data#data{ added_channels = NewAddedChannelsMap }, @@ -680,65 +696,44 @@ make_test_id() -> RandId = iolist_to_binary(emqx_utils:gen_id(16)), <>. -handle_add_channel(From, Data, ChannelId, ChannelConfig) -> +handle_add_channel(From, Data, ChannelId, Config) -> Channels = Data#data.added_channels, - case maps:get(ChannelId, Channels, {error, not_added}) of - {error, _Reason} -> + case + channel_status_is_channel_added( + maps:get( + ChannelId, + Channels, + channel_status() + ) + ) + of + false -> %% The channel is not installed in the connector state - %% We need to install it - handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig); - _ -> + %% We insert it into the channels map and let the health check + %% take care of the rest + NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels), + NewData = Data#data{added_channels = NewChannels}, + {keep_state, update_state(NewData, Data), [ + {reply, From, ok} + ]}; + true -> %% The channel is already installed in the connector state %% We don't need to install it again {keep_state_and_data, [{reply, From, ok}]} end. -handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) -> - NewData = add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig), - %% Trigger a health check to raise alarm if channel is not healthy - {keep_state, NewData, [{reply, From, ok}, {state_timeout, 0, health_check}]}. - -add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) -> - case - emqx_resource:call_add_channel( - Data#data.id, Data#data.mod, Data#data.state, ChannelId, ChannelConfig - ) - of - {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, - %% Setting channel status to connecting to indicate that an health check - %% has not been performed yet - NewAddedChannelsMap = maps:put(ChannelId, connecting, AddedChannelsMap), - UpdatedData = Data#data{ - state = NewState, - added_channels = NewAddedChannelsMap - }, - update_state(UpdatedData, Data); - {error, _Reason} = Error -> - ChannelsMap = Data#data.added_channels, - NewChannelsMap = maps:put(ChannelId, Error, ChannelsMap), - UpdatedData = Data#data{ - added_channels = NewChannelsMap - }, - update_state(UpdatedData, Data) - end. - handle_not_connected_add_channel(From, ChannelId, State, Data) -> %% When state is not connected the channel will be added to the channels %% map but nothing else will happen. - Channels = Data#data.added_channels, - NewChannels = maps:put(ChannelId, {error, resource_not_operational}, Channels), - NewData1 = Data#data{added_channels = NewChannels}, - %% Do channel health check to trigger alarm - NewData2 = channels_health_check(State, NewData1), - {keep_state, update_state(NewData2, Data), [{reply, From, ok}]}. + NewData = add_channel_status_if_not_exists(Data, ChannelId, State), + {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, %% Deactivate alarm _ = maybe_clear_alarm(ChannelId), - case maps:get(ChannelId, Channels, {error, not_added}) of - {error, _} -> + case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status())) of + false -> %% The channel is already not installed in the connector state. %% We still need to remove it from the added_channels map AddedChannels = Data#data.added_channels, @@ -747,7 +742,7 @@ handle_remove_channel(From, ChannelId, Data) -> added_channels = NewAddedChannels }, {keep_state, NewData, [{reply, From, ok}]}; - _ -> + true -> %% The channel is installed in the connector state handle_remove_channel_exists(From, ChannelId, Data) end. @@ -777,9 +772,10 @@ handle_remove_channel_exists(From, ChannelId, Data) -> {keep_state_and_data, [{reply, From, Error}]} end. -handle_not_connected_remove_channel(From, ChannelId, Data) -> - %% When state is not connected the channel will be removed from the channels - %% map but nothing else will happen. +handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) -> + %% When state is not connected and not connecting the channel will be removed + %% from the channels map but nothing else will happen since the channel + %% is not addded/installed in the resource state. Channels = Data#data.added_channels, NewChannels = maps:remove(ChannelId, Channels), NewData = Data#data{added_channels = NewChannels}, @@ -796,7 +792,7 @@ handle_manually_health_check(From, Data) -> ). handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) -> - {keep_state_and_data, [{reply, From, {ok, disconnected}}]}; + {keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]}; handle_manually_channel_health_check( From, #data{added_channels = Channels} = _Data, @@ -810,10 +806,11 @@ handle_manually_channel_health_check( _Data, _ChannelId ) -> - {keep_state_and_data, [{reply, From, {error, channel_not_found}}]}. + {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}. get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) -> - emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State). + RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), + channel_status(RawStatus). handle_connecting_health_check(Data) -> with_health_check( @@ -833,9 +830,9 @@ handle_connected_health_check(Data) -> with_health_check( Data, fun - (connected, UpdatedData) -> - {keep_state, channels_health_check(connected, UpdatedData), - health_check_actions(UpdatedData)}; + (connected, UpdatedData0) -> + UpdatedData1 = channels_health_check(connected, UpdatedData0), + {keep_state, UpdatedData1, health_check_actions(UpdatedData1)}; (Status, UpdatedData) -> ?SLOG(warning, #{ msg => "health_check_failed", @@ -861,20 +858,59 @@ with_health_check(#data{error = PrevError} = Data, Func) -> channels_health_check(connected = _ResourceStatus, Data0) -> Channels = maps:to_list(Data0#data.added_channels), - %% All channels with an error status are considered not added + %% All channels with a stutus different from connected or connecting are + %% not added ChannelsNotAdded = [ ChannelId || {ChannelId, Status} <- Channels, - not is_channel_added(Status) + not channel_status_is_channel_added(Status) ], %% Attempt to add channels that are not added - ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded), + ChannelsNotAddedWithConfigs = + get_config_for_channels(Data0, ChannelsNotAdded), Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0), %% Now that we have done the adding, we can get the status of all channels Data2 = channel_status_for_all_channels(Data1), update_state(Data2, Data0); +channels_health_check(connecting, Data0) -> + %% Whenever the resource is connecting: + %% 1. Change the status of all added channels to connecting + %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) + Channels = Data0#data.added_channels, + ChannelsToChangeStatusFor = [ + ChannelId + || {ChannelId, Status} <- maps:to_list(Channels), + channel_status_is_channel_added(Status) + ], + ChannelsWithNewStatuses = + [ + {ChannelId, channel_status({connecting, resource_is_connecting})} + || ChannelId <- ChannelsToChangeStatusFor + ], + %% Update the channels map + NewChannels = lists:foldl( + fun({ChannelId, NewStatus}, Acc) -> + maps:update(ChannelId, NewStatus, Acc) + end, + Channels, + ChannelsWithNewStatuses + ), + ChannelsWithNewAndPrevErrorStatuses = + [ + {ChannelId, NewStatus, maps:get(ChannelId, Channels)} + || {ChannelId, NewStatus} <- maps:to_list(NewChannels) + ], + %% Raise alarms for all channels + lists:foreach( + fun({ChannelId, Status, PrevStatus}) -> + maybe_alarm(connecting, ChannelId, Status, PrevStatus) + end, + ChannelsWithNewAndPrevErrorStatuses + ), + Data1 = Data0#data{added_channels = NewChannels}, + update_state(Data1, Data0); channels_health_check(ResourceStatus, Data0) -> - %% Whenever the resource is not connected: + %% Whenever the resource is not connected and not connecting: %% 1. Remove all added channels %% 2. Change the status to an error status %% 3. Raise alarms @@ -882,13 +918,20 @@ channels_health_check(ResourceStatus, Data0) -> ChannelsToRemove = [ ChannelId || {ChannelId, Status} <- maps:to_list(Channels), - is_channel_added(Status) + channel_status_is_channel_added(Status) ], Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true), ChannelsWithNewAndOldStatuses = [ {ChannelId, OldStatus, - {error, resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1)}} + channel_status( + {error, + resource_not_connected_channel_error_msg( + ResourceStatus, + ChannelId, + Data1 + )} + )} || {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels) ], %% Raise alarms @@ -928,18 +971,19 @@ channel_status_for_all_channels(Data) -> AddedChannelsWithOldAndNewStatus = [ {ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)} || {ChannelId, OldStatus} <- Channels, - is_channel_added(OldStatus) + channel_status_is_channel_added(OldStatus) ], - %% Remove the added channels with a new error statuses + %% Remove the added channels with a a status different from connected or connecting ChannelsToRemove = [ ChannelId - || {ChannelId, _, {error, _}} <- AddedChannelsWithOldAndNewStatus + || {ChannelId, _, NewStatus} <- AddedChannelsWithOldAndNewStatus, + not channel_status_is_channel_added(NewStatus) ], Data1 = remove_channels_in_list(ChannelsToRemove, Data, true), %% Raise/clear alarms lists:foreach( fun - ({ID, _OldStatus, connected}) -> + ({ID, _OldStatus, #{status := connected}}) -> _ = maybe_clear_alarm(ID); ({ID, OldStatus, NewStatus}) -> _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus) @@ -958,18 +1002,14 @@ channel_status_for_all_channels(Data) -> ), Data1#data{added_channels = NewChannelsMap}. -is_channel_added({error, _}) -> - false; -is_channel_added(_) -> - true. - get_config_for_channels(Data0, ChannelsWithoutConfig) -> ResId = Data0#data.id, Mod = Data0#data.mod, Channels = emqx_resource:call_get_channels(ResId, Mod), ChannelIdToConfig = maps:from_list(Channels), + ChannelStatusMap = Data0#data.added_channels, ChannelsWithConfig = [ - {Id, maps:get(Id, ChannelIdToConfig, no_config)} + {Id, get_config_from_map_or_channel_status(Id, ChannelIdToConfig, ChannelStatusMap)} || Id <- ChannelsWithoutConfig ], %% Filter out channels without config @@ -979,6 +1019,16 @@ get_config_for_channels(Data0, ChannelsWithoutConfig) -> Conf =/= no_config ]. +get_config_from_map_or_channel_status(ChannelId, ChannelIdToConfig, ChannelStatusMap) -> + ChannelStatus = maps:get(ChannelId, ChannelStatusMap, #{}), + case maps:get(config, ChannelStatus, undefined) of + undefined -> + %% Channel config + maps:get(ChannelId, ChannelIdToConfig, no_config); + Config -> + Config + end. + update_state(Data) -> update_state(Data, undefined). @@ -1098,3 +1148,86 @@ safe_call(ResId, Message, Timeout) -> exit:{timeout, _} -> {error, timeout} end. + +%% Helper functions for chanel status data + +channel_status() -> + #{ + %% The status of the channel. Can be one of the following: + %% - disconnected: the channel is not added to the resource (error may contain the reason)) + %% - connecting: the channel has been added to the resource state but + %% either the resource status is connecting or the + %% on_channel_get_status callback has returned connecting + %% - connected: the channel is added to the resource, the resource is + %% connected and the on_channel_get_status callback has returned + %% connected. The error field should be undefined. + status => disconnected, + error => not_added_yet + }. + +%% If the channel is added with add_channel/2, the config field will be set to +%% the config. This is useful when doing probing since the config is not stored +%% anywhere else in that case. +channel_status_new_with_config(Config) -> + #{ + status => disconnected, + error => not_added_yet, + config => Config + }. + +channel_status_new_waiting_for_health_check() -> + #{ + status => connecting, + error => no_health_check_yet + }. + +channel_status({connecting, Error}) -> + #{ + status => connecting, + error => Error + }; +channel_status(connecting) -> + #{ + status => connecting, + error => <<"Not connected for unknown reason">> + }; +channel_status(connected) -> + #{ + status => connected, + error => undefined + }; +%% Probably not so useful but it is permitted to set an error even when the +%% status is connected +channel_status({connected, Error}) -> + #{ + status => connected, + error => Error + }; +channel_status({error, Reason}) -> + #{ + status => disconnected, + error => Reason + }. + +channel_status_is_channel_added(#{ + status := connected +}) -> + true; +channel_status_is_channel_added(#{ + status := connecting +}) -> + true; +channel_status_is_channel_added(_Status) -> + false. + +add_channel_status_if_not_exists(Data, ChannelId, State) -> + Channels = Data#data.added_channels, + case maps:is_key(ChannelId, Channels) of + true -> + Data; + false -> + ChannelStatus = channel_status({error, resource_not_operational}), + NewChannels = maps:put(ChannelId, ChannelStatus, Channels), + maybe_alarm(State, ChannelId, ChannelStatus, no_prev), + Data#data{added_channels = NewChannels} + end.