Line data Source code
1 : /*
2 : *
3 : * Copyright (c) 2021 Project CHIP Authors
4 : * All rights reserved.
5 : *
6 : * Licensed under the Apache License, Version 2.0 (the "License");
7 : * you may not use this file except in compliance with the License.
8 : * You may obtain a copy of the License at
9 : *
10 : * http://www.apache.org/licenses/LICENSE-2.0
11 : *
12 : * Unless required by applicable law or agreed to in writing, software
13 : * distributed under the License is distributed on an "AS IS" BASIS,
14 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 : * See the License for the specific language governing permissions and
16 : * limitations under the License.
17 : */
18 :
19 : /**
20 : * @file
21 : * This file defines the initiator side of a CHIP Read Interaction.
22 : *
23 : */
24 :
25 : #include <app/AppConfig.h>
26 : #include <app/InteractionModelEngine.h>
27 : #include <app/InteractionModelHelper.h>
28 : #include <app/ReadClient.h>
29 : #include <app/StatusResponse.h>
30 : #include <assert.h>
31 : #include <lib/core/TLVTypes.h>
32 : #include <lib/support/FibonacciUtils.h>
33 : #include <messaging/ReliableMessageMgr.h>
34 : #include <messaging/ReliableMessageProtocolConfig.h>
35 : #include <platform/LockTracker.h>
36 : #include <tracing/metric_event.h>
37 :
38 : #include <app-common/zap-generated/cluster-objects.h>
39 : #include <app-common/zap-generated/ids/Attributes.h>
40 : #include <app-common/zap-generated/ids/Clusters.h>
41 :
42 : namespace chip {
43 : namespace app {
44 :
45 : using Status = Protocols::InteractionModel::Status;
46 :
47 1118 : ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback,
48 1118 : InteractionType aInteractionType) :
49 1118 : mExchange(*this),
50 1118 : mpCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
51 2236 : mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
52 : {
53 1118 : assertChipStackLockedByCurrentThread();
54 :
55 1118 : mpExchangeMgr = apExchangeMgr;
56 1118 : mInteractionType = aInteractionType;
57 :
58 1118 : mpImEngine = apImEngine;
59 :
60 1118 : if (aInteractionType == InteractionType::Subscribe)
61 : {
62 314 : mpImEngine->AddReadClient(this);
63 : }
64 1118 : }
65 :
66 281 : void ReadClient::ClearActiveSubscriptionState()
67 : {
68 281 : mIsReporting = false;
69 281 : mWaitingForFirstPrimingReport = true;
70 281 : mPendingMoreChunks = false;
71 281 : mMinIntervalFloorSeconds = 0;
72 281 : mMaxInterval = 0;
73 281 : mSubscriptionId = 0;
74 281 : mIsResubscriptionScheduled = false;
75 :
76 281 : MoveToState(ClientState::Idle);
77 281 : }
78 :
79 582 : void ReadClient::StopResubscription()
80 : {
81 582 : CancelLivenessCheckTimer();
82 582 : CancelResubscribeTimer();
83 :
84 : // Only deallocate the paths if they are not already deallocated.
85 582 : if (mReadPrepareParams.mpAttributePathParamsList != nullptr || mReadPrepareParams.mpEventPathParamsList != nullptr ||
86 358 : mReadPrepareParams.mpDataVersionFilterList != nullptr)
87 : {
88 224 : mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams));
89 : // Make sure we will never try to free those pointers again.
90 224 : mReadPrepareParams.mpAttributePathParamsList = nullptr;
91 224 : mReadPrepareParams.mAttributePathParamsListSize = 0;
92 224 : mReadPrepareParams.mpEventPathParamsList = nullptr;
93 224 : mReadPrepareParams.mEventPathParamsListSize = 0;
94 224 : mReadPrepareParams.mpDataVersionFilterList = nullptr;
95 224 : mReadPrepareParams.mDataVersionFilterListSize = 0;
96 : }
97 582 : }
98 :
99 1206 : ReadClient::~ReadClient()
100 : {
101 1118 : assertChipStackLockedByCurrentThread();
102 :
103 1118 : if (IsSubscriptionType())
104 : {
105 314 : StopResubscription();
106 :
107 : // Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it.
108 : // This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine
109 : // will point to null)
110 : //
111 314 : if (mpImEngine)
112 : {
113 149 : mpImEngine->RemoveReadClient(this);
114 : }
115 : }
116 1206 : }
117 :
118 12 : uint32_t ReadClient::ComputeTimeTillNextSubscription()
119 : {
120 12 : uint32_t maxWaitTimeInMsec = 0;
121 12 : uint32_t waitTimeInMsec = 0;
122 12 : uint32_t minWaitTimeInMsec = 0;
123 :
124 12 : if (mNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX)
125 : {
126 12 : maxWaitTimeInMsec = GetFibonacciForIndex(mNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS;
127 : }
128 : else
129 : {
130 0 : maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS;
131 : }
132 :
133 12 : if (maxWaitTimeInMsec != 0)
134 : {
135 3 : minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100;
136 3 : waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec));
137 : }
138 :
139 12 : if (mMinimalResubscribeDelay.count() > waitTimeInMsec)
140 : {
141 0 : waitTimeInMsec = mMinimalResubscribeDelay.count();
142 : }
143 :
144 12 : return waitTimeInMsec;
145 : }
146 :
147 12 : CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional<SessionHandle> aNewSessionHandle,
148 : bool aReestablishCASE)
149 : {
150 12 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
151 :
152 : //
153 : // If we're establishing CASE, make sure we are not provided a new SessionHandle as well.
154 : //
155 12 : VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT);
156 :
157 12 : if (aNewSessionHandle.HasValue())
158 : {
159 0 : mReadPrepareParams.mSessionHolder.Grab(aNewSessionHandle.Value());
160 : }
161 :
162 12 : mForceCaseOnNextResub = aReestablishCASE;
163 12 : if (mForceCaseOnNextResub && mReadPrepareParams.mSessionHolder)
164 : {
165 : // Mark our existing session defunct, so that we will try to
166 : // re-establish it when the timer fires (unless something re-establishes
167 : // before then).
168 0 : mReadPrepareParams.mSessionHolder->AsSecureSession()->MarkAsDefunct();
169 : }
170 :
171 12 : ReturnErrorOnFailure(
172 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
173 : System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this));
174 12 : mIsResubscriptionScheduled = true;
175 :
176 12 : return CHIP_NO_ERROR;
177 : }
178 :
179 1021 : void ReadClient::Close(CHIP_ERROR aError, bool allowResubscription)
180 : {
181 1021 : if (IsReadType())
182 : {
183 740 : if (aError != CHIP_NO_ERROR)
184 : {
185 49 : mpCallback.OnError(aError);
186 : }
187 : }
188 : else
189 : {
190 281 : if (IsAwaitingInitialReport() || IsAwaitingSubscribeResponse())
191 : {
192 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, aError);
193 : }
194 :
195 281 : ClearActiveSubscriptionState();
196 281 : if (aError != CHIP_NO_ERROR)
197 : {
198 : //
199 : // We infer that re-subscription was requested by virtue of having a non-zero list of event OR attribute paths present
200 : // in mReadPrepareParams. This would only be the case if an application called SendAutoResubscribeRequest which
201 : // populates mReadPrepareParams with the values provided by the application.
202 : //
203 114 : if (allowResubscription &&
204 47 : (mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0))
205 : {
206 14 : CHIP_ERROR originalReason = aError;
207 :
208 14 : aError = mpCallback.OnResubscriptionNeeded(this, aError);
209 14 : if (aError == CHIP_NO_ERROR)
210 : {
211 14 : return;
212 : }
213 2 : if (aError == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
214 : {
215 2 : VerifyOrDie(originalReason == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT);
216 2 : ChipLogProgress(DataManagement, "ICD device is inactive mark subscription as InactiveICDSubscription");
217 2 : MoveToState(ClientState::InactiveICDSubscription);
218 2 : return;
219 : }
220 : }
221 :
222 : //
223 : // Either something bad happened when requesting resubscription or the application has decided to not
224 : // continue by returning an error. Let's convey the error back up to the application
225 : // and shut everything down.
226 : //
227 100 : mpCallback.OnError(aError);
228 : }
229 :
230 267 : StopResubscription();
231 : }
232 :
233 1007 : mExchange.Release();
234 :
235 1007 : mpCallback.OnDone(this);
236 : }
237 :
238 1951 : const char * ReadClient::GetStateStr() const
239 : {
240 : #if CHIP_DETAIL_LOGGING
241 1951 : switch (mState)
242 : {
243 281 : case ClientState::Idle:
244 281 : return "Idle";
245 1108 : case ClientState::AwaitingInitialReport:
246 1108 : return "AwaitingInitialReport";
247 283 : case ClientState::AwaitingSubscribeResponse:
248 283 : return "AwaitingSubscribeResponse";
249 277 : case ClientState::SubscriptionActive:
250 277 : return "SubscriptionActive";
251 2 : case ClientState::InactiveICDSubscription:
252 2 : return "InactiveICDSubscription";
253 : }
254 : #endif // CHIP_DETAIL_LOGGING
255 0 : return "N/A";
256 : }
257 :
258 1951 : void ReadClient::MoveToState(const ClientState aTargetState)
259 : {
260 1951 : mState = aTargetState;
261 1951 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Moving to [%10.10s]", __func__, this, GetStateStr());
262 1951 : }
263 :
264 868 : CHIP_ERROR ReadClient::SendRequest(ReadPrepareParams & aReadPrepareParams)
265 : {
266 868 : if (mInteractionType == InteractionType::Read)
267 : {
268 792 : return SendReadRequest(aReadPrepareParams);
269 : }
270 :
271 76 : if (mInteractionType == InteractionType::Subscribe)
272 : {
273 76 : return SendSubscribeRequest(aReadPrepareParams);
274 : }
275 :
276 0 : return CHIP_ERROR_INVALID_ARGUMENT;
277 : }
278 :
279 792 : CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams)
280 : {
281 792 : CHIP_ERROR err = CHIP_NO_ERROR;
282 :
283 792 : ChipLogDetail(DataManagement, "%s ReadClient[%p]: Sending Read Request", __func__, this);
284 :
285 792 : VerifyOrReturnError(ClientState::Idle == mState, err = CHIP_ERROR_INCORRECT_STATE);
286 :
287 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
288 792 : aReadPrepareParams.mAttributePathParamsListSize);
289 792 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
290 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
291 792 : aReadPrepareParams.mDataVersionFilterListSize);
292 :
293 792 : System::PacketBufferHandle msgBuf;
294 792 : ReadRequestMessage::Builder request;
295 792 : System::PacketBufferTLVWriter writer;
296 :
297 792 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
298 792 : ReturnErrorOnFailure(request.Init(&writer));
299 :
300 792 : if (!attributePaths.empty())
301 : {
302 673 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
303 673 : ReturnErrorOnFailure(err = request.GetError());
304 673 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
305 : }
306 :
307 792 : if (!eventPaths.empty())
308 : {
309 317 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
310 317 : ReturnErrorOnFailure(err = request.GetError());
311 :
312 317 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
313 :
314 317 : Optional<EventNumber> eventMin;
315 317 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
316 317 : if (eventMin.HasValue())
317 : {
318 314 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
319 314 : ReturnErrorOnFailure(err = request.GetError());
320 314 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
321 : }
322 : }
323 :
324 792 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
325 :
326 792 : bool encodedDataVersionList = false;
327 792 : TLV::TLVWriter backup;
328 792 : request.Checkpoint(backup);
329 792 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
330 792 : ReturnErrorOnFailure(request.GetError());
331 792 : if (!attributePaths.empty())
332 : {
333 673 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
334 : encodedDataVersionList));
335 : }
336 792 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
337 792 : if (encodedDataVersionList)
338 : {
339 80 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
340 : }
341 : else
342 : {
343 712 : request.Rollback(backup);
344 : }
345 :
346 792 : ReturnErrorOnFailure(request.EndOfReadRequestMessage());
347 792 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
348 :
349 792 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
350 :
351 792 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
352 792 : VerifyOrReturnError(exchange != nullptr, err = CHIP_ERROR_NO_MEMORY);
353 :
354 792 : mExchange.Grab(exchange);
355 :
356 792 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
357 : {
358 792 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
359 : }
360 : else
361 : {
362 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
363 : }
364 :
365 792 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf),
366 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
367 :
368 792 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
369 792 : MoveToState(ClientState::AwaitingInitialReport);
370 :
371 792 : return CHIP_NO_ERROR;
372 792 : }
373 :
374 343 : CHIP_ERROR ReadClient::GenerateEventPaths(EventPathIBs::Builder & aEventPathsBuilder, const Span<EventPathParams> & aEventPaths)
375 : {
376 854 : for (auto & event : aEventPaths)
377 : {
378 511 : VerifyOrReturnError(event.IsValidEventPath(), CHIP_ERROR_IM_MALFORMED_EVENT_PATH_IB);
379 511 : EventPathIB::Builder & path = aEventPathsBuilder.CreatePath();
380 511 : ReturnErrorOnFailure(aEventPathsBuilder.GetError());
381 511 : ReturnErrorOnFailure(path.Encode(event));
382 : }
383 :
384 343 : return aEventPathsBuilder.EndOfEventPaths();
385 : }
386 :
387 978 : CHIP_ERROR ReadClient::GenerateAttributePaths(AttributePathIBs::Builder & aAttributePathIBsBuilder,
388 : const Span<AttributePathParams> & aAttributePaths)
389 : {
390 2970 : for (auto & attribute : aAttributePaths)
391 : {
392 1994 : VerifyOrReturnError(attribute.IsValidAttributePath(), CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
393 1992 : AttributePathIB::Builder & path = aAttributePathIBsBuilder.CreatePath();
394 1992 : ReturnErrorOnFailure(aAttributePathIBsBuilder.GetError());
395 1992 : ReturnErrorOnFailure(path.Encode(attribute));
396 : }
397 :
398 976 : return aAttributePathIBsBuilder.EndOfAttributePathIBs();
399 : }
400 :
401 969 : CHIP_ERROR ReadClient::BuildDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
402 : const Span<AttributePathParams> & aAttributePaths,
403 : const Span<DataVersionFilter> & aDataVersionFilters,
404 : bool & aEncodedDataVersionList)
405 : {
406 : #if CHIP_PROGRESS_LOGGING
407 969 : size_t encodedFilterCount = 0;
408 969 : size_t irrelevantFilterCount = 0;
409 969 : size_t skippedFilterCount = 0;
410 : #endif
411 3462 : for (auto & filter : aDataVersionFilters)
412 : {
413 2494 : VerifyOrReturnError(filter.IsValidDataVersionFilter(), CHIP_ERROR_INVALID_ARGUMENT);
414 :
415 : // If data version filter is for some cluster none of whose attributes are included in our paths, discard this filter.
416 2494 : bool intersected = false;
417 2506 : for (auto & path : aAttributePaths)
418 : {
419 2500 : if (path.IncludesAttributesInCluster(filter))
420 : {
421 2488 : intersected = true;
422 2488 : break;
423 : }
424 : }
425 :
426 2494 : if (!intersected)
427 : {
428 : #if CHIP_PROGRESS_LOGGING
429 6 : ++irrelevantFilterCount;
430 : #endif
431 6 : continue;
432 : }
433 :
434 2488 : TLV::TLVWriter backup;
435 2488 : aDataVersionFilterIBsBuilder.Checkpoint(backup);
436 2488 : CHIP_ERROR err = aDataVersionFilterIBsBuilder.EncodeDataVersionFilterIB(filter);
437 2488 : if (err == CHIP_NO_ERROR)
438 : {
439 : #if CHIP_PROGRESS_LOGGING
440 2487 : ++encodedFilterCount;
441 : #endif
442 2487 : aEncodedDataVersionList = true;
443 : }
444 1 : else if (err == CHIP_ERROR_NO_MEMORY || err == CHIP_ERROR_BUFFER_TOO_SMALL)
445 : {
446 : // Packet is full, ignore the rest of the list
447 1 : aDataVersionFilterIBsBuilder.Rollback(backup);
448 : #if CHIP_PROGRESS_LOGGING
449 1 : ssize_t nonSkippedFilterCount = &filter - aDataVersionFilters.data();
450 1 : skippedFilterCount = aDataVersionFilters.size() - static_cast<size_t>(nonSkippedFilterCount);
451 : #endif // CHIP_PROGRESS_LOGGING
452 1 : break;
453 : }
454 : else
455 : {
456 0 : return err;
457 : }
458 : }
459 :
460 969 : ChipLogProgress(DataManagement,
461 : "%lu data version filters provided, %lu not relevant, %lu encoded, %lu skipped due to lack of space",
462 : static_cast<unsigned long>(aDataVersionFilters.size()), static_cast<unsigned long>(irrelevantFilterCount),
463 : static_cast<unsigned long>(encodedFilterCount), static_cast<unsigned long>(skippedFilterCount));
464 969 : return CHIP_NO_ERROR;
465 : }
466 :
467 974 : CHIP_ERROR ReadClient::GenerateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
468 : const Span<AttributePathParams> & aAttributePaths,
469 : const Span<DataVersionFilter> & aDataVersionFilters,
470 : bool & aEncodedDataVersionList)
471 : {
472 : // Give the callback a chance first, otherwise use the list we have, if any.
473 974 : ReturnErrorOnFailure(
474 : mpCallback.OnUpdateDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aEncodedDataVersionList));
475 :
476 974 : if (!aEncodedDataVersionList)
477 : {
478 969 : ReturnErrorOnFailure(BuildDataVersionFilterList(aDataVersionFilterIBsBuilder, aAttributePaths, aDataVersionFilters,
479 : aEncodedDataVersionList));
480 : }
481 :
482 974 : return CHIP_NO_ERROR;
483 : }
484 :
485 4 : void ReadClient::OnActiveModeNotification()
486 : {
487 4 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
488 : // When we reach here, the subscription definitely exceeded the liveness timeout. Just continue the unfinished resubscription
489 : // logic in `OnLivenessTimeoutCallback`.
490 4 : if (IsInactiveICDSubscription())
491 : {
492 1 : TriggerResubscriptionForLivenessTimeout(CHIP_ERROR_TIMEOUT);
493 1 : return;
494 : }
495 :
496 3 : TriggerResubscribeIfScheduled("check-in message");
497 : }
498 :
499 3 : void ReadClient::OnPeerTypeChange(PeerType aType)
500 : {
501 3 : VerifyOrDie(mpImEngine->InActiveReadClientList(this));
502 :
503 3 : mIsPeerLIT = (aType == PeerType::kLITICD);
504 :
505 3 : ChipLogProgress(DataManagement, "Peer is now %s LIT ICD.", mIsPeerLIT ? "a" : "not a");
506 :
507 : // If the peer is no longer LIT, try to wake up the subscription and do resubscribe when necessary.
508 3 : if (!mIsPeerLIT)
509 : {
510 2 : OnActiveModeNotification();
511 : }
512 3 : }
513 :
514 2178 : CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchangeContext, const PayloadHeader & aPayloadHeader,
515 : System::PacketBufferHandle && aPayload)
516 : {
517 2178 : CHIP_ERROR err = CHIP_NO_ERROR;
518 2178 : Status status = Status::InvalidAction;
519 2178 : VerifyOrExit(!IsIdle() && !IsInactiveICDSubscription(), err = CHIP_ERROR_INCORRECT_STATE);
520 :
521 2178 : if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::ReportData))
522 : {
523 1829 : err = ProcessReportData(std::move(aPayload), ReportType::kContinuingTransaction);
524 : }
525 349 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::SubscribeResponse))
526 : {
527 279 : ChipLogProgress(DataManagement, "SubscribeResponse is received");
528 279 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
529 279 : err = ProcessSubscribeResponse(std::move(aPayload));
530 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, err);
531 : }
532 70 : else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::StatusResponse))
533 : {
534 138 : VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
535 70 : CHIP_ERROR statusError = CHIP_NO_ERROR;
536 70 : SuccessOrExit(err = StatusResponse::ProcessStatusResponse(std::move(aPayload), statusError));
537 70 : SuccessOrExit(err = statusError);
538 2 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
539 : }
540 : else
541 : {
542 0 : err = CHIP_ERROR_INVALID_MESSAGE_TYPE;
543 : }
544 :
545 2178 : exit:
546 2178 : if (err != CHIP_NO_ERROR)
547 : {
548 76 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
549 : {
550 4 : status = Status::InvalidSubscription;
551 : }
552 76 : StatusResponse::Send(status, apExchangeContext, false /*aExpectResponse*/);
553 : }
554 :
555 2178 : if ((!IsSubscriptionType() && !mPendingMoreChunks) || err != CHIP_NO_ERROR)
556 : {
557 767 : Close(err);
558 : }
559 :
560 2178 : return err;
561 : }
562 :
563 82 : void ReadClient::OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext, System::PacketBufferHandle && aPayload)
564 : {
565 82 : Status status = Status::Success;
566 82 : mExchange.Grab(apExchangeContext);
567 :
568 : //
569 : // Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received.
570 : // This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid
571 : // session to us, of which there could be multiple.
572 : //
573 : // Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible
574 : // to maximize our chances of success later.
575 : //
576 82 : mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle());
577 :
578 82 : CHIP_ERROR err = ProcessReportData(std::move(aPayload), ReportType::kUnsolicited);
579 82 : if (err != CHIP_NO_ERROR)
580 : {
581 0 : if (err == CHIP_ERROR_INVALID_SUBSCRIPTION)
582 : {
583 0 : status = Status::InvalidSubscription;
584 : }
585 : else
586 : {
587 0 : status = Status::InvalidAction;
588 : }
589 :
590 0 : StatusResponse::Send(status, mExchange.Get(), false /*aExpectResponse*/);
591 0 : Close(err);
592 : }
593 82 : }
594 :
595 1919 : CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload, ReportType aReportType)
596 : {
597 1919 : CHIP_ERROR err = CHIP_NO_ERROR;
598 1919 : ReportDataMessage::Parser report;
599 1919 : bool suppressResponse = true;
600 1919 : SubscriptionId subscriptionId = 0;
601 1919 : EventReportIBs::Parser eventReportIBs;
602 1919 : AttributeReportIBs::Parser attributeReportIBs;
603 1919 : System::PacketBufferTLVReader reader;
604 1919 : reader.Init(std::move(aPayload));
605 1919 : err = report.Init(reader);
606 1919 : SuccessOrExit(err);
607 :
608 : #if CHIP_CONFIG_IM_PRETTY_PRINT
609 1917 : if (aReportType != ReportType::kUnsolicited)
610 : {
611 1835 : report.PrettyPrint();
612 : }
613 : #endif
614 :
615 1917 : err = report.GetSuppressResponse(&suppressResponse);
616 1917 : if (CHIP_END_OF_TLV == err)
617 : {
618 1218 : suppressResponse = false;
619 1218 : err = CHIP_NO_ERROR;
620 : }
621 1917 : SuccessOrExit(err);
622 :
623 1917 : err = report.GetSubscriptionId(&subscriptionId);
624 1917 : if (CHIP_NO_ERROR == err)
625 : {
626 415 : VerifyOrExit(IsSubscriptionType(), err = CHIP_ERROR_INVALID_ARGUMENT);
627 413 : if (mWaitingForFirstPrimingReport)
628 : {
629 281 : mSubscriptionId = subscriptionId;
630 : }
631 132 : else if (!IsMatchingSubscriptionId(subscriptionId))
632 : {
633 2 : err = CHIP_ERROR_INVALID_SUBSCRIPTION;
634 : }
635 : }
636 1502 : else if (CHIP_END_OF_TLV == err)
637 : {
638 1502 : if (IsSubscriptionType())
639 : {
640 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
641 : }
642 : else
643 : {
644 1502 : err = CHIP_NO_ERROR;
645 : }
646 : }
647 1915 : SuccessOrExit(err);
648 :
649 1913 : err = report.GetMoreChunkedMessages(&mPendingMoreChunks);
650 1913 : if (CHIP_END_OF_TLV == err)
651 : {
652 1048 : mPendingMoreChunks = false;
653 1048 : err = CHIP_NO_ERROR;
654 : }
655 1913 : SuccessOrExit(err);
656 :
657 1913 : err = report.GetEventReports(&eventReportIBs);
658 1913 : if (err == CHIP_END_OF_TLV)
659 : {
660 1273 : err = CHIP_NO_ERROR;
661 : }
662 640 : else if (err == CHIP_NO_ERROR)
663 : {
664 640 : chip::TLV::TLVReader EventReportsReader;
665 640 : eventReportIBs.GetReader(&EventReportsReader);
666 640 : err = ProcessEventReportIBs(EventReportsReader);
667 : }
668 1913 : SuccessOrExit(err);
669 :
670 1913 : err = report.GetAttributeReportIBs(&attributeReportIBs);
671 1913 : if (err == CHIP_END_OF_TLV)
672 : {
673 631 : err = CHIP_NO_ERROR;
674 : }
675 1282 : else if (err == CHIP_NO_ERROR)
676 : {
677 1282 : TLV::TLVReader attributeReportIBsReader;
678 1282 : attributeReportIBs.GetReader(&attributeReportIBsReader);
679 1282 : err = ProcessAttributeReportIBs(attributeReportIBsReader);
680 : }
681 1913 : SuccessOrExit(err);
682 :
683 1911 : if (mIsReporting && !mPendingMoreChunks)
684 : {
685 1017 : mpCallback.OnReportEnd();
686 1017 : mIsReporting = false;
687 : }
688 :
689 1911 : SuccessOrExit(err = report.ExitContainer());
690 :
691 1919 : exit:
692 1919 : if (IsSubscriptionType())
693 : {
694 415 : if (IsAwaitingInitialReport())
695 : {
696 283 : MoveToState(ClientState::AwaitingSubscribeResponse);
697 : }
698 132 : else if (IsSubscriptionActive() && err == CHIP_NO_ERROR)
699 : {
700 : //
701 : // Only refresh the liveness check timer if we've successfully established
702 : // a subscription and have a valid value for mMaxInterval which the function
703 : // relies on.
704 : //
705 103 : mpCallback.NotifySubscriptionStillActive(*this);
706 103 : err = RefreshLivenessCheckTimer();
707 : }
708 : }
709 :
710 1919 : if (!suppressResponse && err == CHIP_NO_ERROR)
711 : {
712 1216 : bool noResponseExpected = IsSubscriptionActive() && !mPendingMoreChunks;
713 1216 : err = StatusResponse::Send(Status::Success, mExchange.Get(), !noResponseExpected);
714 : }
715 :
716 1919 : mWaitingForFirstPrimingReport = false;
717 1919 : return err;
718 1919 : }
719 :
720 12 : void ReadClient::OnResponseTimeout(Messaging::ExchangeContext * apExchangeContext)
721 : {
722 12 : ChipLogError(DataManagement, "Time out! failed to receive report data from Exchange: " ChipLogFormatExchange,
723 : ChipLogValueExchange(apExchangeContext));
724 :
725 12 : Close(CHIP_ERROR_TIMEOUT);
726 12 : }
727 :
728 5 : CHIP_ERROR ReadClient::ReadICDOperatingModeFromAttributeDataIB(TLV::TLVReader && aReader, PeerType & aType)
729 : {
730 : Clusters::IcdManagement::Attributes::OperatingMode::TypeInfo::DecodableType operatingMode;
731 :
732 5 : CHIP_ERROR err = DataModel::Decode(aReader, operatingMode);
733 5 : ReturnErrorOnFailure(err);
734 :
735 5 : switch (operatingMode)
736 : {
737 4 : case Clusters::IcdManagement::OperatingModeEnum::kSit:
738 4 : aType = PeerType::kNormal;
739 4 : break;
740 1 : case Clusters::IcdManagement::OperatingModeEnum::kLit:
741 1 : aType = PeerType::kLITICD;
742 1 : break;
743 0 : default:
744 0 : err = CHIP_ERROR_INVALID_ARGUMENT;
745 0 : break;
746 : }
747 :
748 5 : return err;
749 : }
750 :
751 4429 : CHIP_ERROR ReadClient::ProcessAttributePath(AttributePathIB::Parser & aAttributePathParser,
752 : ConcreteDataAttributePath & aAttributePath)
753 : {
754 : // The ReportData must contain a concrete attribute path. Don't validate ID
755 : // ranges here, so we can tell apart "malformed data" and "out of range
756 : // IDs".
757 4429 : CHIP_ERROR err = CHIP_NO_ERROR;
758 : // The ReportData must contain a concrete attribute path
759 4429 : err = aAttributePathParser.GetConcreteAttributePath(aAttributePath, AttributePathIB::ValidateIdRanges::kNo);
760 4429 : VerifyOrReturnError(err == CHIP_NO_ERROR, CHIP_ERROR_IM_MALFORMED_ATTRIBUTE_PATH_IB);
761 4427 : return CHIP_NO_ERROR;
762 : }
763 :
764 6004 : void ReadClient::NoteReportingData()
765 : {
766 6004 : if (!mIsReporting)
767 : {
768 1071 : mpCallback.OnReportBegin();
769 1071 : mIsReporting = true;
770 : }
771 6004 : }
772 :
773 1282 : CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeReportIBsReader)
774 : {
775 1282 : CHIP_ERROR err = CHIP_NO_ERROR;
776 5709 : while (CHIP_NO_ERROR == (err = aAttributeReportIBsReader.Next()))
777 : {
778 4429 : TLV::TLVReader dataReader;
779 4429 : AttributeReportIB::Parser report;
780 4429 : AttributeDataIB::Parser data;
781 4429 : AttributeStatusIB::Parser status;
782 4429 : AttributePathIB::Parser path;
783 4429 : ConcreteDataAttributePath attributePath;
784 4429 : StatusIB statusIB;
785 :
786 4429 : TLV::TLVReader reader = aAttributeReportIBsReader;
787 4429 : ReturnErrorOnFailure(report.Init(reader));
788 :
789 4429 : err = report.GetAttributeStatus(&status);
790 4429 : if (CHIP_NO_ERROR == err)
791 : {
792 8 : StatusIB::Parser errorStatus;
793 8 : ReturnErrorOnFailure(status.GetPath(&path));
794 8 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
795 8 : if (!attributePath.IsValid())
796 : {
797 : // Don't fail the entire read or subscription when there is an
798 : // out-of-range ID. Just skip that one AttributeReportIB.
799 0 : ChipLogError(DataManagement,
800 : "Skipping AttributeStatusIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
801 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
802 : ChipLogValueMEI(attributePath.mAttributeId));
803 0 : continue;
804 : }
805 :
806 8 : ReturnErrorOnFailure(status.GetErrorStatus(&errorStatus));
807 8 : ReturnErrorOnFailure(errorStatus.DecodeStatusIB(statusIB));
808 8 : NoteReportingData();
809 8 : mpCallback.OnAttributeData(attributePath, nullptr, statusIB);
810 : }
811 4421 : else if (CHIP_END_OF_TLV == err)
812 : {
813 4421 : ReturnErrorOnFailure(report.GetAttributeData(&data));
814 4421 : ReturnErrorOnFailure(data.GetPath(&path));
815 4421 : ReturnErrorOnFailure(ProcessAttributePath(path, attributePath));
816 4419 : if (!attributePath.IsValid())
817 : {
818 : // Don't fail the entire read or subscription when there is an
819 : // out-of-range ID. Just skip that one AttributeReportIB.
820 2 : ChipLogError(DataManagement,
821 : "Skipping AttributeDataIB with out-of-range IDs: (%d, " ChipLogFormatMEI ", " ChipLogFormatMEI ") ",
822 : attributePath.mEndpointId, ChipLogValueMEI(attributePath.mClusterId),
823 : ChipLogValueMEI(attributePath.mAttributeId));
824 2 : continue;
825 : }
826 :
827 4417 : DataVersion version = 0;
828 4417 : ReturnErrorOnFailure(data.GetDataVersion(&version));
829 4417 : attributePath.mDataVersion.SetValue(version);
830 :
831 4417 : if (mReadPrepareParams.mpDataVersionFilterList != nullptr)
832 : {
833 65 : UpdateDataVersionFilters(attributePath);
834 : }
835 :
836 4417 : ReturnErrorOnFailure(data.GetData(&dataReader));
837 :
838 : // The element in an array may be another array -- so we should only set the list operation when we are handling the
839 : // whole list.
840 4417 : if (!attributePath.IsListOperation() && dataReader.GetType() == TLV::kTLVType_Array)
841 : {
842 2189 : attributePath.mListOp = ConcreteDataAttributePath::ListOperation::ReplaceAll;
843 : }
844 :
845 4417 : if (attributePath.MatchesConcreteAttributePath(ConcreteAttributePath(
846 : kRootEndpointId, Clusters::IcdManagement::Id, Clusters::IcdManagement::Attributes::OperatingMode::Id)))
847 : {
848 : PeerType peerType;
849 5 : TLV::TLVReader operatingModeTlvReader;
850 5 : operatingModeTlvReader.Init(dataReader);
851 5 : if (CHIP_NO_ERROR == ReadICDOperatingModeFromAttributeDataIB(std::move(operatingModeTlvReader), peerType))
852 : {
853 : // It is safe to call `OnPeerTypeChange` since we are in the middle of parsing the attribute data, And
854 : // the subscription should be active so `OnActiveModeNotification` is a no-op in this case.
855 5 : InteractionModelEngine::GetInstance()->OnPeerTypeChange(mPeer, peerType);
856 : }
857 : else
858 : {
859 0 : ChipLogError(DataManagement, "Failed to get ICD state from attribute data with error'%" CHIP_ERROR_FORMAT "'",
860 : err.Format());
861 : }
862 : }
863 :
864 4417 : NoteReportingData();
865 4417 : mpCallback.OnAttributeData(attributePath, &dataReader, statusIB);
866 : }
867 : }
868 :
869 1280 : if (CHIP_END_OF_TLV == err)
870 : {
871 1280 : err = CHIP_NO_ERROR;
872 : }
873 :
874 1280 : return err;
875 : }
876 :
877 640 : CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsReader)
878 : {
879 640 : CHIP_ERROR err = CHIP_NO_ERROR;
880 2219 : while (CHIP_NO_ERROR == (err = aEventReportIBsReader.Next()))
881 : {
882 1579 : TLV::TLVReader dataReader;
883 1579 : EventReportIB::Parser report;
884 1579 : EventDataIB::Parser data;
885 1579 : EventHeader header;
886 1579 : StatusIB statusIB; // Default value for statusIB is success.
887 :
888 1579 : TLV::TLVReader reader = aEventReportIBsReader;
889 1579 : ReturnErrorOnFailure(report.Init(reader));
890 :
891 1579 : err = report.GetEventData(&data);
892 :
893 1579 : if (err == CHIP_NO_ERROR)
894 : {
895 1575 : header.mTimestamp = mEventTimestamp;
896 1575 : ReturnErrorOnFailure(data.DecodeEventHeader(header));
897 1575 : mEventTimestamp = header.mTimestamp;
898 :
899 1575 : ReturnErrorOnFailure(data.GetData(&dataReader));
900 :
901 : //
902 : // Update the event number being tracked in mReadPrepareParams in case
903 : // we want to send it in the next SubscribeRequest message to convey
904 : // the event number for which we have already received an event.
905 : //
906 1575 : mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1);
907 :
908 1575 : NoteReportingData();
909 1575 : mpCallback.OnEventData(header, &dataReader, nullptr);
910 : }
911 4 : else if (err == CHIP_END_OF_TLV)
912 : {
913 4 : EventStatusIB::Parser status;
914 4 : EventPathIB::Parser pathIB;
915 4 : StatusIB::Parser statusIBParser;
916 4 : ReturnErrorOnFailure(report.GetEventStatus(&status));
917 4 : ReturnErrorOnFailure(status.GetPath(&pathIB));
918 4 : ReturnErrorOnFailure(pathIB.GetEventPath(&header.mPath));
919 4 : ReturnErrorOnFailure(status.GetErrorStatus(&statusIBParser));
920 4 : ReturnErrorOnFailure(statusIBParser.DecodeStatusIB(statusIB));
921 :
922 4 : NoteReportingData();
923 4 : mpCallback.OnEventData(header, nullptr, &statusIB);
924 : }
925 : }
926 :
927 640 : if (CHIP_END_OF_TLV == err)
928 : {
929 640 : err = CHIP_NO_ERROR;
930 : }
931 :
932 640 : return err;
933 : }
934 :
935 0 : void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout)
936 : {
937 0 : mLivenessTimeoutOverride = aLivenessTimeout;
938 0 : auto err = RefreshLivenessCheckTimer();
939 0 : if (err != CHIP_NO_ERROR)
940 : {
941 0 : Close(err);
942 : }
943 0 : }
944 :
945 380 : CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
946 : {
947 380 : CHIP_ERROR err = CHIP_NO_ERROR;
948 :
949 380 : VerifyOrReturnError(IsSubscriptionActive(), CHIP_ERROR_INCORRECT_STATE);
950 :
951 380 : CancelLivenessCheckTimer();
952 :
953 : System::Clock::Timeout timeout;
954 380 : ReturnErrorOnFailure(ComputeLivenessCheckTimerTimeout(&timeout));
955 :
956 : // EFR32/MBED/INFINION/K32W's chrono count return long unsigned, but other platform returns unsigned
957 380 : ChipLogProgress(
958 : DataManagement,
959 : "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 " Peer = %02x:" ChipLogFormatX64,
960 : static_cast<long unsigned>(timeout.count()), mSubscriptionId, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()));
961 380 : err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
962 : timeout, OnLivenessTimeoutCallback, this);
963 :
964 380 : return err;
965 : }
966 :
967 380 : CHIP_ERROR ReadClient::ComputeLivenessCheckTimerTimeout(System::Clock::Timeout * aTimeout)
968 : {
969 380 : if (mLivenessTimeoutOverride != System::Clock::kZero)
970 : {
971 0 : *aTimeout = mLivenessTimeoutOverride;
972 0 : return CHIP_NO_ERROR;
973 : }
974 :
975 380 : VerifyOrReturnError(mReadPrepareParams.mSessionHolder, CHIP_ERROR_INCORRECT_STATE);
976 :
977 : //
978 : // To calculate the duration we're willing to wait for a report to come to us, we take into account the maximum interval of
979 : // the subscription AND the time it takes for the report to make it to us in the worst case.
980 : //
981 : // We have no way to estimate what the network latency will be, but we do know the other side will time out its ReportData
982 : // after its computed round-trip timeout plus the processing time it gives us (app::kExpectedIMProcessingTime). Once it
983 : // times out, assuming it sent the report at all, there's no point in us thinking we still have a subscription.
984 : //
985 : // We can't use ComputeRoundTripTimeout() on the session for two reasons: we want the roundtrip timeout from the point of
986 : // view of the peer, not us, and we want to start off with the assumption the peer will likely have, which is that we are
987 : // idle, whereas ComputeRoundTripTimeout() uses the current activity state of the peer.
988 : //
989 : // So recompute the round-trip timeout directly. Assume MRP, since in practice that is likely what is happening.
990 380 : auto & peerMRPConfig = mReadPrepareParams.mSessionHolder->GetRemoteMRPConfig();
991 : // Peer will assume we are idle (hence we pass kZero to GetMessageReceiptTimeout()), but will assume we treat it as active
992 : // for the response, so to match the retransmission timeout computation for the message back to the peer, we should treat
993 : // it as active and handling non-initial message, isFirstMessageOnExchange needs to be set as false for
994 : // GetRetransmissionTimeout.
995 : auto roundTripTimeout =
996 380 : mReadPrepareParams.mSessionHolder->GetMessageReceiptTimeout(System::Clock::kZero, true /*isFirstMessageOnExchange*/) +
997 380 : kExpectedIMProcessingTime +
998 380 : GetRetransmissionTimeout(peerMRPConfig.mActiveRetransTimeout, peerMRPConfig.mIdleRetransTimeout,
999 380 : System::SystemClock().GetMonotonicTimestamp(), peerMRPConfig.mActiveThresholdTime,
1000 380 : false /*isFirstMessageOnExchange*/);
1001 380 : *aTimeout = System::Clock::Seconds16(mMaxInterval) + roundTripTimeout;
1002 380 : return CHIP_NO_ERROR;
1003 : }
1004 :
1005 962 : void ReadClient::CancelLivenessCheckTimer()
1006 : {
1007 962 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
1008 : OnLivenessTimeoutCallback, this);
1009 962 : }
1010 :
1011 583 : void ReadClient::CancelResubscribeTimer()
1012 : {
1013 583 : InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->CancelTimer(
1014 : OnResubscribeTimerCallback, this);
1015 583 : mIsResubscriptionScheduled = false;
1016 583 : }
1017 :
1018 7 : void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState)
1019 : {
1020 7 : ReadClient * const _this = reinterpret_cast<ReadClient *>(apAppState);
1021 :
1022 : // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e
1023 : // response timeouts).
1024 7 : CHIP_ERROR subscriptionTerminationCause = CHIP_ERROR_TIMEOUT;
1025 :
1026 : //
1027 : // Might as well try to see if this instance exists in the tracked list in the IM.
1028 : // This might blow-up if either the client has since been free'ed (use-after-free), or if the engine has since
1029 : // been shutdown at which point the client wouldn't exist in the active read client list.
1030 : //
1031 7 : VerifyOrDie(_this->mpImEngine->InActiveReadClientList(_this));
1032 :
1033 7 : ChipLogError(DataManagement,
1034 : "Subscription Liveness timeout with SubscriptionID = 0x%08" PRIx32 ", Peer = %02x:" ChipLogFormatX64,
1035 : _this->mSubscriptionId, _this->GetFabricIndex(), ChipLogValueX64(_this->GetPeerNodeId()));
1036 :
1037 7 : if (_this->mIsPeerLIT)
1038 : {
1039 3 : subscriptionTerminationCause = CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
1040 : }
1041 :
1042 7 : _this->TriggerResubscriptionForLivenessTimeout(subscriptionTerminationCause);
1043 7 : }
1044 :
1045 8 : void ReadClient::TriggerResubscriptionForLivenessTimeout(CHIP_ERROR aReason)
1046 : {
1047 : // We didn't get a message from the server on time; it's possible that it no
1048 : // longer has a useful CASE session to us. Mark defunct all sessions that
1049 : // have not seen peer activity in at least as long as our session.
1050 8 : const auto & holder = mReadPrepareParams.mSessionHolder;
1051 8 : if (holder)
1052 : {
1053 8 : System::Clock::Timestamp lastPeerActivity = holder->AsSecureSession()->GetLastPeerActivityTime();
1054 8 : mpImEngine->GetExchangeManager()->GetSessionManager()->ForEachMatchingSession(mPeer, [&lastPeerActivity](auto * session) {
1055 8 : if (!session->IsCASESession())
1056 : {
1057 8 : return;
1058 : }
1059 :
1060 0 : if (session->GetLastPeerActivityTime() > lastPeerActivity)
1061 : {
1062 0 : return;
1063 : }
1064 :
1065 0 : session->MarkAsDefunct();
1066 : });
1067 : }
1068 :
1069 8 : Close(aReason);
1070 8 : }
1071 :
1072 279 : CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aPayload)
1073 : {
1074 279 : System::PacketBufferTLVReader reader;
1075 279 : reader.Init(std::move(aPayload));
1076 :
1077 279 : SubscribeResponseMessage::Parser subscribeResponse;
1078 279 : ReturnErrorOnFailure(subscribeResponse.Init(reader));
1079 :
1080 : #if CHIP_CONFIG_IM_PRETTY_PRINT
1081 279 : subscribeResponse.PrettyPrint();
1082 : #endif
1083 :
1084 279 : SubscriptionId subscriptionId = 0;
1085 279 : VerifyOrReturnError(subscribeResponse.GetSubscriptionId(&subscriptionId) == CHIP_NO_ERROR, CHIP_ERROR_INVALID_ARGUMENT);
1086 279 : VerifyOrReturnError(IsMatchingSubscriptionId(subscriptionId), CHIP_ERROR_INVALID_SUBSCRIPTION);
1087 277 : ReturnErrorOnFailure(subscribeResponse.GetMaxInterval(&mMaxInterval));
1088 :
1089 : #if CHIP_PROGRESS_LOGGING
1090 277 : auto duration = System::Clock::Milliseconds32(System::SystemClock().GetMonotonicTimestamp() - mSubscribeRequestTime);
1091 : #endif
1092 277 : ChipLogProgress(DataManagement,
1093 : "Subscription established in %" PRIu32 "ms with SubscriptionID = 0x%08" PRIx32 " MinInterval = %u"
1094 : "s MaxInterval = %us Peer = %02x:" ChipLogFormatX64,
1095 : duration.count(), mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, GetFabricIndex(),
1096 : ChipLogValueX64(GetPeerNodeId()));
1097 :
1098 277 : ReturnErrorOnFailure(subscribeResponse.ExitContainer());
1099 :
1100 277 : MoveToState(ClientState::SubscriptionActive);
1101 :
1102 277 : mpCallback.OnSubscriptionEstablished(subscriptionId);
1103 :
1104 277 : mNumRetries = 0;
1105 :
1106 277 : ReturnErrorOnFailure(RefreshLivenessCheckTimer());
1107 :
1108 277 : return CHIP_NO_ERROR;
1109 279 : }
1110 :
1111 224 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams)
1112 : {
1113 : // Make sure we don't use minimal resubscribe delays from previous attempts
1114 : // for this one.
1115 224 : mMinimalResubscribeDelay = System::Clock::kZero;
1116 :
1117 224 : mReadPrepareParams = std::move(aReadPrepareParams);
1118 224 : CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams);
1119 224 : if (err != CHIP_NO_ERROR)
1120 : {
1121 1 : StopResubscription();
1122 : }
1123 224 : return err;
1124 : }
1125 :
1126 0 : CHIP_ERROR ReadClient::SendAutoResubscribeRequest(const ScopedNodeId & aPublisherId, ReadPrepareParams && aReadPrepareParams)
1127 : {
1128 0 : mPeer = aPublisherId;
1129 0 : mReadPrepareParams = std::move(aReadPrepareParams);
1130 0 : CHIP_ERROR err = EstablishSessionToPeer();
1131 0 : if (err != CHIP_NO_ERROR)
1132 : {
1133 : // Make sure we call our callback's OnDeallocatePaths.
1134 0 : StopResubscription();
1135 : }
1136 0 : return err;
1137 : }
1138 :
1139 311 : CHIP_ERROR ReadClient::SendSubscribeRequest(const ReadPrepareParams & aReadPrepareParams)
1140 : {
1141 311 : VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds,
1142 : CHIP_ERROR_INVALID_ARGUMENT);
1143 :
1144 308 : auto err = SendSubscribeRequestImpl(aReadPrepareParams);
1145 308 : if (CHIP_NO_ERROR != err)
1146 : {
1147 : MATTER_LOG_METRIC_END(Tracing::kMetricDeviceSubscriptionSetup, err);
1148 : }
1149 308 : return err;
1150 : }
1151 :
1152 308 : CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadPrepareParams)
1153 : {
1154 : MATTER_LOG_METRIC_BEGIN(Tracing::kMetricDeviceSubscriptionSetup);
1155 :
1156 : #if CHIP_PROGRESS_LOGGING
1157 308 : mSubscribeRequestTime = System::SystemClock().GetMonotonicTimestamp();
1158 : #endif
1159 :
1160 308 : VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE);
1161 :
1162 308 : if (&aReadPrepareParams != &mReadPrepareParams)
1163 : {
1164 76 : mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder;
1165 : }
1166 :
1167 308 : mIsPeerLIT = aReadPrepareParams.mIsPeerLIT;
1168 :
1169 308 : mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds;
1170 :
1171 : // Todo: Remove the below, Update span in ReadPrepareParams
1172 308 : Span<AttributePathParams> attributePaths(aReadPrepareParams.mpAttributePathParamsList,
1173 308 : aReadPrepareParams.mAttributePathParamsListSize);
1174 308 : Span<EventPathParams> eventPaths(aReadPrepareParams.mpEventPathParamsList, aReadPrepareParams.mEventPathParamsListSize);
1175 308 : Span<DataVersionFilter> dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList,
1176 308 : aReadPrepareParams.mDataVersionFilterListSize);
1177 :
1178 308 : System::PacketBufferHandle msgBuf;
1179 308 : System::PacketBufferTLVWriter writer;
1180 308 : SubscribeRequestMessage::Builder request;
1181 308 : InitWriterWithSpaceReserved(writer, kReservedSizeForTLVEncodingOverhead);
1182 :
1183 308 : ReturnErrorOnFailure(request.Init(&writer));
1184 :
1185 308 : request.KeepSubscriptions(aReadPrepareParams.mKeepSubscriptions)
1186 308 : .MinIntervalFloorSeconds(aReadPrepareParams.mMinIntervalFloorSeconds)
1187 308 : .MaxIntervalCeilingSeconds(aReadPrepareParams.mMaxIntervalCeilingSeconds);
1188 :
1189 308 : if (!attributePaths.empty())
1190 : {
1191 301 : AttributePathIBs::Builder & attributePathListBuilder = request.CreateAttributeRequests();
1192 301 : ReturnErrorOnFailure(attributePathListBuilder.GetError());
1193 301 : ReturnErrorOnFailure(GenerateAttributePaths(attributePathListBuilder, attributePaths));
1194 : }
1195 :
1196 308 : if (!eventPaths.empty())
1197 : {
1198 22 : EventPathIBs::Builder & eventPathListBuilder = request.CreateEventRequests();
1199 22 : ReturnErrorOnFailure(eventPathListBuilder.GetError());
1200 22 : ReturnErrorOnFailure(GenerateEventPaths(eventPathListBuilder, eventPaths));
1201 :
1202 22 : Optional<EventNumber> eventMin;
1203 22 : ReturnErrorOnFailure(GetMinEventNumber(aReadPrepareParams, eventMin));
1204 22 : if (eventMin.HasValue())
1205 : {
1206 0 : EventFilterIBs::Builder & eventFilters = request.CreateEventFilters();
1207 0 : ReturnErrorOnFailure(request.GetError());
1208 0 : ReturnErrorOnFailure(eventFilters.GenerateEventFilter(eventMin.Value()));
1209 : }
1210 : }
1211 :
1212 308 : ReturnErrorOnFailure(request.IsFabricFiltered(aReadPrepareParams.mIsFabricFiltered).GetError());
1213 :
1214 308 : bool encodedDataVersionList = false;
1215 308 : TLV::TLVWriter backup;
1216 308 : request.Checkpoint(backup);
1217 308 : DataVersionFilterIBs::Builder & dataVersionFilterListBuilder = request.CreateDataVersionFilters();
1218 308 : ReturnErrorOnFailure(request.GetError());
1219 308 : if (!attributePaths.empty())
1220 : {
1221 301 : ReturnErrorOnFailure(GenerateDataVersionFilterList(dataVersionFilterListBuilder, attributePaths, dataVersionFilters,
1222 : encodedDataVersionList));
1223 : }
1224 308 : ReturnErrorOnFailure(dataVersionFilterListBuilder.GetWriter()->UnreserveBuffer(kReservedSizeForTLVEncodingOverhead));
1225 308 : if (encodedDataVersionList)
1226 : {
1227 65 : ReturnErrorOnFailure(dataVersionFilterListBuilder.EndOfDataVersionFilterIBs());
1228 : }
1229 : else
1230 : {
1231 243 : request.Rollback(backup);
1232 : }
1233 :
1234 308 : ReturnErrorOnFailure(request.EndOfSubscribeRequestMessage());
1235 308 : ReturnErrorOnFailure(writer.Finalize(&msgBuf));
1236 :
1237 308 : VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);
1238 :
1239 308 : auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
1240 308 : if (exchange == nullptr)
1241 : {
1242 0 : if (aReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1243 : {
1244 0 : return CHIP_ERROR_NO_MEMORY;
1245 : }
1246 :
1247 : // Trying to subscribe with a defunct session somehow.
1248 0 : return CHIP_ERROR_INCORRECT_STATE;
1249 : }
1250 :
1251 308 : mExchange.Grab(exchange);
1252 :
1253 308 : if (aReadPrepareParams.mTimeout == System::Clock::kZero)
1254 : {
1255 308 : mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
1256 : }
1257 : else
1258 : {
1259 0 : mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
1260 : }
1261 :
1262 308 : ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf),
1263 : Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
1264 :
1265 308 : mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer();
1266 308 : MoveToState(ClientState::AwaitingInitialReport);
1267 :
1268 308 : return CHIP_NO_ERROR;
1269 308 : }
1270 :
1271 6 : CHIP_ERROR ReadClient::DefaultResubscribePolicy(CHIP_ERROR aTerminationCause)
1272 : {
1273 6 : if (aTerminationCause == CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT)
1274 : {
1275 0 : ChipLogProgress(DataManagement, "ICD device is inactive, skipping scheduling resubscribe within DefaultResubscribePolicy");
1276 0 : return CHIP_ERROR_LIT_SUBSCRIBE_INACTIVE_TIMEOUT;
1277 : }
1278 :
1279 6 : VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
1280 :
1281 6 : auto timeTillNextResubscription = ComputeTimeTillNextSubscription();
1282 6 : ChipLogProgress(DataManagement,
1283 : "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32
1284 : "ms due to error %" CHIP_ERROR_FORMAT,
1285 : GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()), mNumRetries, timeTillNextResubscription,
1286 : aTerminationCause.Format());
1287 6 : return ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT);
1288 : }
1289 :
1290 0 : void ReadClient::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
1291 : const SessionHandle & sessionHandle)
1292 : {
1293 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1294 0 : VerifyOrDie(_this != nullptr);
1295 :
1296 0 : ChipLogProgress(DataManagement, "HandleDeviceConnected");
1297 0 : _this->mReadPrepareParams.mSessionHolder.Grab(sessionHandle);
1298 0 : _this->mpExchangeMgr = &exchangeMgr;
1299 :
1300 0 : _this->mpCallback.OnCASESessionEstablished(sessionHandle, _this->mReadPrepareParams);
1301 :
1302 0 : auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1303 0 : if (err != CHIP_NO_ERROR)
1304 : {
1305 0 : _this->Close(err);
1306 : }
1307 0 : }
1308 :
1309 0 : void ReadClient::HandleDeviceConnectionFailure(void * context, const OperationalSessionSetup::ConnectionFailureInfo & failureInfo)
1310 : {
1311 0 : ReadClient * const _this = static_cast<ReadClient *>(context);
1312 0 : VerifyOrDie(_this != nullptr);
1313 :
1314 0 : ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'",
1315 : failureInfo.error.Format());
1316 :
1317 : #if CHIP_CONFIG_ENABLE_BUSY_HANDLING_FOR_OPERATIONAL_SESSION_SETUP
1318 : #if CHIP_DETAIL_LOGGING
1319 0 : if (failureInfo.requestedBusyDelay.HasValue())
1320 : {
1321 0 : ChipLogDetail(DataManagement, "Will delay resubscription by %u ms due to BUSY response",
1322 : failureInfo.requestedBusyDelay.Value().count());
1323 : }
1324 : #endif // CHIP_DETAIL_LOGGING
1325 0 : _this->mMinimalResubscribeDelay = failureInfo.requestedBusyDelay.ValueOr(System::Clock::kZero);
1326 : #else
1327 : _this->mMinimalResubscribeDelay = System::Clock::kZero;
1328 : #endif // CHIP_CONFIG_ENABLE_BUSY_HANDLING_FOR_OPERATIONAL_SESSION_SETUP
1329 :
1330 0 : _this->Close(failureInfo.error);
1331 0 : }
1332 :
1333 9 : void ReadClient::OnResubscribeTimerCallback(System::Layer * /* If this starts being used, fix callers that pass nullptr */,
1334 : void * apAppState)
1335 : {
1336 9 : ReadClient * const _this = static_cast<ReadClient *>(apAppState);
1337 9 : VerifyOrDie(_this != nullptr);
1338 :
1339 9 : _this->mIsResubscriptionScheduled = false;
1340 :
1341 : CHIP_ERROR err;
1342 :
1343 9 : ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: ForceCASE = %d", _this->mForceCaseOnNextResub);
1344 9 : _this->mNumRetries++;
1345 :
1346 9 : bool allowResubscribeOnError = true;
1347 18 : if (!_this->mReadPrepareParams.mSessionHolder ||
1348 9 : !_this->mReadPrepareParams.mSessionHolder->AsSecureSession()->IsActiveSession())
1349 : {
1350 : // We don't have an active CASE session. We need to go ahead and set
1351 : // one up, if we can.
1352 0 : if (_this->EstablishSessionToPeer() == CHIP_NO_ERROR)
1353 : {
1354 0 : return;
1355 : }
1356 :
1357 0 : if (_this->mForceCaseOnNextResub)
1358 : {
1359 : // Caller asked us to force CASE but we have no way to do CASE.
1360 : // Just stop trying.
1361 0 : allowResubscribeOnError = false;
1362 : }
1363 :
1364 : // No way to send our subscribe request.
1365 0 : err = CHIP_ERROR_INCORRECT_STATE;
1366 0 : ExitNow();
1367 : }
1368 :
1369 9 : err = _this->SendSubscribeRequest(_this->mReadPrepareParams);
1370 :
1371 9 : exit:
1372 9 : if (err != CHIP_NO_ERROR)
1373 : {
1374 : //
1375 : // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid
1376 : // CASESessionManager pointer when mForceCaseOnNextResub was true.
1377 : //
1378 : // In that case, don't permit re-subscription to occur.
1379 : //
1380 0 : _this->Close(err, allowResubscribeOnError);
1381 : }
1382 : }
1383 :
1384 65 : void ReadClient::UpdateDataVersionFilters(const ConcreteDataAttributePath & aPath)
1385 : {
1386 130 : for (size_t index = 0; index < mReadPrepareParams.mDataVersionFilterListSize; index++)
1387 : {
1388 65 : if (mReadPrepareParams.mpDataVersionFilterList[index].mEndpointId == aPath.mEndpointId &&
1389 65 : mReadPrepareParams.mpDataVersionFilterList[index].mClusterId == aPath.mClusterId)
1390 : {
1391 : // Now we know the current version for this cluster is aPath.mDataVersion.
1392 65 : mReadPrepareParams.mpDataVersionFilterList[index].mDataVersion = aPath.mDataVersion;
1393 : }
1394 : }
1395 65 : }
1396 :
1397 339 : CHIP_ERROR ReadClient::GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional<EventNumber> & aEventMin)
1398 : {
1399 339 : if (aReadPrepareParams.mEventNumber.HasValue())
1400 : {
1401 312 : aEventMin = aReadPrepareParams.mEventNumber;
1402 : }
1403 : else
1404 : {
1405 27 : ReturnErrorOnFailure(mpCallback.GetHighestReceivedEventNumber(aEventMin));
1406 27 : if (aEventMin.HasValue())
1407 : {
1408 : // We want to start with the first event _after_ the last one we received.
1409 2 : aEventMin.SetValue(aEventMin.Value() + 1);
1410 : }
1411 : }
1412 339 : return CHIP_NO_ERROR;
1413 : }
1414 :
1415 198 : void ReadClient::TriggerResubscribeIfScheduled(const char * reason)
1416 : {
1417 198 : if (!mIsResubscriptionScheduled)
1418 : {
1419 197 : return;
1420 : }
1421 :
1422 1 : ChipLogDetail(DataManagement, "ReadClient[%p] triggering resubscribe, reason: %s", this, reason);
1423 1 : CancelResubscribeTimer();
1424 1 : OnResubscribeTimerCallback(nullptr, this);
1425 : }
1426 :
1427 0 : Optional<System::Clock::Timeout> ReadClient::GetSubscriptionTimeout()
1428 : {
1429 0 : if (!IsSubscriptionType() || !IsSubscriptionActive())
1430 : {
1431 0 : return NullOptional;
1432 : }
1433 :
1434 : System::Clock::Timeout timeout;
1435 0 : CHIP_ERROR err = ComputeLivenessCheckTimerTimeout(&timeout);
1436 0 : if (err != CHIP_NO_ERROR)
1437 : {
1438 0 : return NullOptional;
1439 : }
1440 :
1441 0 : return MakeOptional(timeout);
1442 : }
1443 :
1444 0 : CHIP_ERROR ReadClient::EstablishSessionToPeer()
1445 : {
1446 0 : ChipLogProgress(DataManagement, "Trying to establish a CASE session for subscription");
1447 0 : auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager();
1448 0 : VerifyOrReturnError(caseSessionManager != nullptr, CHIP_ERROR_INCORRECT_STATE);
1449 0 : caseSessionManager->FindOrEstablishSession(mPeer, &mOnConnectedCallback, &mOnConnectionFailureCallback);
1450 0 : return CHIP_NO_ERROR;
1451 : }
1452 :
1453 : } // namespace app
1454 : } // namespace chip
|