1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.car;
18 
19 import static org.junit.Assert.assertArrayEquals;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 
25 import android.car.vms.VmsAssociatedLayer;
26 import android.car.vms.VmsAvailableLayers;
27 import android.car.vms.VmsLayer;
28 import android.car.vms.VmsLayerDependency;
29 import android.car.vms.VmsLayersOffering;
30 import android.car.vms.VmsSubscriberManager;
31 import android.car.vms.VmsSubscriptionState;
32 import android.util.Pair;
33 
34 import androidx.test.ext.junit.runners.AndroidJUnit4;
35 import androidx.test.filters.FlakyTest;
36 import androidx.test.filters.MediumTest;
37 import androidx.test.filters.RequiresDevice;
38 
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.junit.runner.RunWith;
42 
43 import java.util.Arrays;
44 import java.util.Collections;
45 import java.util.HashSet;
46 import java.util.Set;
47 import java.util.function.Supplier;
48 import java.util.function.ToIntFunction;
49 
50 @RunWith(AndroidJUnit4.class)
51 @MediumTest
52 public class VmsPublisherSubscriberTest extends MockedVmsTestBase {
53     private static final VmsLayer SUBSCRIPTION_LAYER = new VmsLayer(1, 1, 1);
54     private static final VmsLayer SUBSCRIPTION_LAYER_OTHER = new VmsLayer(2, 1, 1);
55 
56     private static final byte[] PAYLOAD = {0xa, 0xb};
57     private static final byte[] PAYLOAD_OTHER = {0xb, 0xc};
58 
59     private static final byte[] PUBLISHER_INFO = {0x0};
60     private static final byte[] PUBLISHER_INFO_OTHER = {0x1};
61 
62     private static final int UNKNOWN_PUBLISHER = 99999;
63 
64     private MockPublisherClient mPublisher;
65     private VmsSubscriberManager mSubscriber;
66     private MockSubscriberClient mSubscriberClient;
67 
68     @Before
setUpClients()69     public void setUpClients() {
70         mPublisher = getMockPublisherClient();
71         mSubscriber = getSubscriberManager();
72         mSubscriberClient = getMockSubscriberClient();
73     }
74 
75     @Test
testPublisherInfo_Unregistered()76     public void testPublisherInfo_Unregistered() {
77         assertEquals(0, mSubscriber.getPublisherInfo(UNKNOWN_PUBLISHER).length);
78     }
79 
80     @Test
testPublisherInfo_Registered()81     public void testPublisherInfo_Registered() {
82         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
83         assertArrayEquals(PUBLISHER_INFO, mSubscriber.getPublisherInfo(publisherId));
84     }
85 
86     @Test
testPublisherId_AlreadyRegistered()87     public void testPublisherId_AlreadyRegistered() {
88         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
89         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO);
90         assertEquals(publisherId, publisherId2);
91     }
92 
93     @Test
testPublisherId_MultiplePublishers()94     public void testPublisherId_MultiplePublishers() {
95         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
96         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
97         assertNotEquals(publisherId, publisherId2);
98         assertArrayEquals(PUBLISHER_INFO, mSubscriber.getPublisherInfo(publisherId));
99         assertArrayEquals(PUBLISHER_INFO_OTHER, mSubscriber.getPublisherInfo(publisherId2));
100     }
101 
102     @Test
testLayerAvailability_Default()103     public void testLayerAvailability_Default() {
104         VmsAvailableLayers availableLayers = mSubscriber.getAvailableLayers();
105         assertEquals(Collections.emptySet(), availableLayers.getAssociatedLayers());
106         assertEquals(0, availableLayers.getSequence());
107     }
108 
109     @Test
testLayerAvailability()110     public void testLayerAvailability() {
111         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
112         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
113                 new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
114 
115         assertLayerAvailability(1,
116                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
117     }
118 
119     @Test
testLayerAvailability_Overwrite()120     public void testLayerAvailability_Overwrite() {
121         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
122         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
123                 new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
124         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
125                 new VmsLayerDependency(SUBSCRIPTION_LAYER_OTHER, Collections.emptySet())),
126                 publisherId));
127 
128         assertLayerAvailability(2,
129                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
130                         Collections.singleton(publisherId)));
131     }
132 
133     @Test
testLayerAvailability_MultiplePublishers_SameLayer()134     public void testLayerAvailability_MultiplePublishers_SameLayer() {
135         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
136         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
137         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
138                 new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
139         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
140                 new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId2));
141 
142         assertLayerAvailability(2,
143                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER,
144                         new HashSet<>(Arrays.asList(publisherId, publisherId2))));
145     }
146 
147     @Test
testLayerAvailability_MultiplePublishers_MultipleLayers()148     public void testLayerAvailability_MultiplePublishers_MultipleLayers() {
149         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
150         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
151         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
152                 new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
153         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
154                 new VmsLayerDependency(SUBSCRIPTION_LAYER_OTHER, Collections.emptySet())),
155                 publisherId2));
156 
157         assertLayerAvailability(2,
158                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)),
159                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
160                         Collections.singleton(publisherId2)));
161     }
162 
163     @Test
testLayerAvailability_MultiplePublishers_Remove()164     public void testLayerAvailability_MultiplePublishers_Remove() {
165         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
166         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
167         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
168                 new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
169         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
170                 new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId2));
171 
172         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.emptySet(), publisherId2));
173 
174         assertLayerAvailability(3,
175                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
176     }
177 
178     @Test
testLayerAvailability_MultiplePublishers_Overwrite()179     public void testLayerAvailability_MultiplePublishers_Overwrite() {
180         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
181         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
182         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
183                 new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
184         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
185                 new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId2));
186 
187         mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
188                 new VmsLayerDependency(SUBSCRIPTION_LAYER_OTHER, Collections.emptySet())),
189                 publisherId2));
190 
191         assertLayerAvailability(3,
192                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)),
193                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
194                         Collections.singleton(publisherId2)));
195     }
196 
197     @Test
testStartMonitoring()198     public void testStartMonitoring() {
199         mSubscriber.startMonitoring();
200         assertNull(mPublisher.receiveSubscriptionState());
201 
202         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
203         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
204         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
205     }
206 
207     @Test
testStartMonitoring_AfterPublish()208     public void testStartMonitoring_AfterPublish() {
209         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
210         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
211 
212         mSubscriber.startMonitoring();
213         assertNull(mSubscriberClient.receiveMessage());
214     }
215 
216     @Test
testStartMonitoring_MultipleLayers()217     public void testStartMonitoring_MultipleLayers() {
218         mSubscriber.startMonitoring();
219 
220         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
221         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
222         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
223 
224         mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
225         assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
226     }
227 
228     @Test
testStartMonitoring_MultiplePublishers()229     public void testStartMonitoring_MultiplePublishers() {
230         mSubscriber.startMonitoring();
231 
232         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
233         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
234         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
235 
236         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
237         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
238         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD_OTHER);
239     }
240 
241     @Test
testStopMonitoring()242     public void testStopMonitoring() {
243         mSubscriber.startMonitoring();
244         mSubscriber.stopMonitoring();
245         assertNull(mPublisher.receiveSubscriptionState());
246 
247         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
248         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
249         assertNull(mSubscriberClient.receiveMessage());
250     }
251 
252     @Test
253     @RequiresDevice
testSubscribe()254     public void testSubscribe() {
255         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
256         assertSubscriptionState(1, SUBSCRIPTION_LAYER);
257 
258         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
259         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
260         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
261     }
262 
263     @Test
testSubscribe_AfterPublish()264     public void testSubscribe_AfterPublish() {
265         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
266 
267         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
268         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
269 
270         assertSubscriptionState(1, SUBSCRIPTION_LAYER);
271         assertNull(mSubscriberClient.receiveMessage());
272     }
273 
274     @Test
testSubscribe_MultipleLayers()275     public void testSubscribe_MultipleLayers() {
276         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
277         mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER);
278         assertSubscriptionState(2, SUBSCRIPTION_LAYER, SUBSCRIPTION_LAYER_OTHER);
279 
280         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
281         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
282         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
283 
284         mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
285         assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
286     }
287 
288     @Test
testSubscribe_MultiplePublishers()289     public void testSubscribe_MultiplePublishers() {
290         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
291         assertSubscriptionState(1, SUBSCRIPTION_LAYER);
292 
293         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
294         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
295         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
296 
297         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
298         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
299         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD_OTHER);
300     }
301 
302     @Test
testSubscribe_MultipleLayers_MultiplePublishers()303     public void testSubscribe_MultipleLayers_MultiplePublishers() {
304         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
305         mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER);
306         assertSubscriptionState(2, SUBSCRIPTION_LAYER, SUBSCRIPTION_LAYER_OTHER);
307 
308         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
309         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
310         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
311 
312         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
313         mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId2, PAYLOAD_OTHER);
314         assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
315     }
316 
317     @Test
testSubscribe_ClearCallback()318     public void testSubscribe_ClearCallback() {
319         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
320         mSubscriber.clearVmsSubscriberClientCallback();
321 
322         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
323         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
324         assertNull(mSubscriberClient.receiveMessage());
325     }
326 
327     @Test(expected = IllegalStateException.class)
testSubscribe_NoCallback()328     public void testSubscribe_NoCallback() {
329         mSubscriber.clearVmsSubscriberClientCallback();
330         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
331     }
332 
333     @Test
testUnsubscribe()334     public void testUnsubscribe() {
335         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
336         mSubscriber.unsubscribe(SUBSCRIPTION_LAYER);
337         assertSubscriptionState(2, Collections.emptySet(), Collections.emptySet());
338 
339         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
340         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
341         assertNull(mSubscriberClient.receiveMessage());
342     }
343 
344     @Test
testUnsubscribe_MultipleLayers()345     public void testUnsubscribe_MultipleLayers() {
346         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
347         mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER);
348         mSubscriber.unsubscribe(SUBSCRIPTION_LAYER);
349         assertSubscriptionState(3, SUBSCRIPTION_LAYER_OTHER);
350 
351         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
352         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
353         assertNull(mSubscriberClient.receiveMessage());
354 
355         mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
356         assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
357     }
358 
359     @Test
testUnsubscribe_MultiplePublishers()360     public void testUnsubscribe_MultiplePublishers() {
361         mSubscriber.subscribe(SUBSCRIPTION_LAYER);
362         mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER);
363         mSubscriber.unsubscribe(SUBSCRIPTION_LAYER);
364         assertSubscriptionState(3, SUBSCRIPTION_LAYER_OTHER);
365 
366         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
367         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
368         assertNull(mSubscriberClient.receiveMessage());
369 
370         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
371         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
372         assertNull(mSubscriberClient.receiveMessage());
373     }
374 
375     @Test
testUnsubscribe_NotSubscribed()376     public void testUnsubscribe_NotSubscribed() {
377         mSubscriber.unsubscribe(SUBSCRIPTION_LAYER);
378         assertNull(mPublisher.receiveSubscriptionState());
379 
380         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
381         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
382         assertNull(mSubscriberClient.receiveMessage());
383     }
384 
385     @Test
testSubscribeToPublisher()386     public void testSubscribeToPublisher() {
387         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
388 
389         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
390         assertSubscriptionState(1,
391                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
392 
393         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
394         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
395     }
396 
397     @Test
testSubscribeToPublisher_AfterPublish()398     public void testSubscribeToPublisher_AfterPublish() {
399         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
400 
401         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
402         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
403 
404         assertSubscriptionState(1,
405                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
406 
407         assertNull(mSubscriberClient.receiveMessage());
408     }
409 
410     @Test
testSubscribeToPublisher_MultipleLayers()411     public void testSubscribeToPublisher_MultipleLayers() {
412         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
413 
414         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
415         mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER, publisherId);
416 
417         assertSubscriptionState(2,
418                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)),
419                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
420                         Collections.singleton(publisherId)));
421 
422         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
423         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
424 
425         mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
426         assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
427     }
428 
429     @Test
testSubscribeToPublisher_MultiplePublishers()430     public void testSubscribeToPublisher_MultiplePublishers() {
431         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
432         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
433 
434         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
435         assertSubscriptionState(1,
436                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
437 
438         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
439         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
440 
441         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
442         assertNull(mSubscriberClient.receiveMessage());
443     }
444 
445     @Test
testSubscribeToPublisher_MultipleLayers_MultiplePublishers()446     public void testSubscribeToPublisher_MultipleLayers_MultiplePublishers() {
447         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
448         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
449 
450         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
451         mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER, publisherId2);
452         assertSubscriptionState(2,
453                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)),
454                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
455                         Collections.singleton(publisherId2)));
456 
457         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
458         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
459 
460         mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId2, PAYLOAD_OTHER);
461         assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
462     }
463 
464     @Test
testSubscribeToPublisher_UnknownPublisher()465     public void testSubscribeToPublisher_UnknownPublisher() {
466         mSubscriber.subscribe(SUBSCRIPTION_LAYER, UNKNOWN_PUBLISHER);
467 
468         assertSubscriptionState(1,
469                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER,
470                         Collections.singleton(UNKNOWN_PUBLISHER)));
471     }
472 
473     @Test
testSubscribeToPublisher_ClearCallback()474     public void testSubscribeToPublisher_ClearCallback() {
475         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
476 
477         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
478         mSubscriber.clearVmsSubscriberClientCallback();
479 
480         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
481         assertNull(mSubscriberClient.receiveMessage());
482     }
483 
484     @Test(expected = IllegalStateException.class)
testSubscribeToPublisher_NoCallback()485     public void testSubscribeToPublisher_NoCallback() {
486         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
487 
488         mSubscriber.clearVmsSubscriberClientCallback();
489         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
490     }
491 
492     @Test
testUnsubscribeToPublisher()493     public void testUnsubscribeToPublisher() {
494         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
495 
496         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
497         mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, publisherId);
498         assertSubscriptionState(2, Collections.emptySet(), Collections.emptySet());
499 
500         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
501         assertNull(mSubscriberClient.receiveMessage());
502     }
503 
504     @Test
testUnsubscribeToPublisher_MultipleLayers()505     public void testUnsubscribeToPublisher_MultipleLayers() {
506         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
507 
508         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
509         mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER, publisherId);
510         mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, publisherId);
511         assertSubscriptionState(3,
512                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
513                         Collections.singleton(publisherId)));
514 
515         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
516         assertNull(mSubscriberClient.receiveMessage());
517 
518         mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
519         assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
520     }
521 
522     @Test
523     @FlakyTest
testUnsubscribeToPublisher_MultiplePublishers()524     public void testUnsubscribeToPublisher_MultiplePublishers() {
525         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
526         int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
527 
528         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
529         mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId2);
530         mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, publisherId);
531         assertSubscriptionState(3,
532                 new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId2)));
533 
534         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
535         assertNull(mSubscriberClient.receiveMessage());
536 
537         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
538         assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD_OTHER);
539     }
540 
541     @Test
testUnsubscribeToPublisher_NotSubscribed()542     public void testUnsubscribeToPublisher_NotSubscribed() {
543         int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
544 
545         mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, publisherId);
546         assertNull(mPublisher.receiveSubscriptionState());
547 
548         mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
549         assertNull(mSubscriberClient.receiveMessage());
550     }
551 
552     @Test
testUnsubscribeToPublisher_UnknownPublisher()553     public void testUnsubscribeToPublisher_UnknownPublisher() {
554         mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, UNKNOWN_PUBLISHER);
555         assertNull(mPublisher.receiveSubscriptionState());
556     }
557 
assertLayerAvailability(int sequenceNumber, VmsAssociatedLayer... associatedLayers)558     private void assertLayerAvailability(int sequenceNumber,
559             VmsAssociatedLayer... associatedLayers) {
560         VmsAvailableLayers availableLayers = receiveWithSequence(
561                 getMockSubscriberClient()::receiveLayerAvailability,
562                 VmsAvailableLayers::getSequence,
563                 sequenceNumber);
564         assertEquals(availableLayers, mSubscriber.getAvailableLayers());
565         assertEquals(new HashSet<>(Arrays.asList(associatedLayers)),
566                 availableLayers.getAssociatedLayers());
567     }
568 
assertSubscriptionState(int sequenceNumber, VmsLayer... layers)569     private void assertSubscriptionState(int sequenceNumber, VmsLayer... layers) {
570         assertSubscriptionState(sequenceNumber, new HashSet<>(Arrays.asList(layers)),
571                 Collections.emptySet());
572     }
573 
assertSubscriptionState(int sequenceNumber, VmsAssociatedLayer... associatedLayers)574     private void assertSubscriptionState(int sequenceNumber,
575             VmsAssociatedLayer... associatedLayers) {
576         assertSubscriptionState(sequenceNumber, Collections.emptySet(),
577                 new HashSet<>(Arrays.asList(associatedLayers)));
578     }
579 
assertSubscriptionState(int sequenceNumber, Set<VmsLayer> layers, Set<VmsAssociatedLayer> associatedLayers)580     private void assertSubscriptionState(int sequenceNumber, Set<VmsLayer> layers,
581             Set<VmsAssociatedLayer> associatedLayers) {
582         VmsSubscriptionState subscriptionState = receiveWithSequence(
583                 mPublisher::receiveSubscriptionState,
584                 VmsSubscriptionState::getSequenceNumber,
585                 sequenceNumber);
586         assertEquals(layers, subscriptionState.getLayers());
587         assertEquals(associatedLayers, subscriptionState.getAssociatedLayers());
588     }
589 
receiveWithSequence(Supplier<T> supplierFunction, ToIntFunction<T> sequenceNumberFn, int sequenceNumber)590     private static <T> T receiveWithSequence(Supplier<T> supplierFunction,
591             ToIntFunction<T> sequenceNumberFn, int sequenceNumber) {
592         T obj = null;
593         for (int seq = 1; seq <= sequenceNumber; seq++) {
594             obj = supplierFunction.get();
595             assertNotNull(obj);
596             assertEquals(seq, sequenceNumberFn.applyAsInt(obj));
597         }
598         return obj;
599     }
600 
assertDataMessage(VmsLayer layer, byte[] payload)601     private void assertDataMessage(VmsLayer layer, byte[] payload) {
602         Pair<VmsLayer, byte[]> message = mSubscriberClient.receiveMessage();
603         assertEquals(layer, message.first);
604         assertArrayEquals(payload, message.second);
605     }
606 }
607