1 /*
2  * Copyright 2014 Google Inc. All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 import MyGame.Example.Monster;
18 import MyGame.Example.MonsterStorageGrpc;
19 import MyGame.Example.Stat;
20 import com.google.flatbuffers.FlatBufferBuilder;
21 import io.grpc.ManagedChannel;
22 import io.grpc.ManagedChannelBuilder;
23 import io.grpc.Server;
24 import io.grpc.ServerBuilder;
25 import io.grpc.stub.StreamObserver;
26 import org.junit.Assert;
27 
28 import java.io.IOException;
29 import java.lang.InterruptedException;
30 import java.nio.ByteBuffer;
31 import java.util.Iterator;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicReference;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.CountDownLatch;
36 
37 
38 /**
39  * Demonstrates basic client-server interaction using grpc-java over netty.
40  */
41 public class JavaGrpcTest {
42     static final String BIG_MONSTER_NAME = "Cyberdemon";
43     static final short nestedMonsterHp = 600;
44     static final short nestedMonsterMana = 1024;
45     static final int numStreamedMsgs = 10;
46     static final int timeoutMs = 3000;
47     static Server server;
48     static ManagedChannel channel;
49     static MonsterStorageGrpc.MonsterStorageBlockingStub blockingStub;
50     static MonsterStorageGrpc.MonsterStorageStub asyncStub;
51 
52     static class MyService extends MonsterStorageGrpc.MonsterStorageImplBase {
53         @Override
store(Monster request, io.grpc.stub.StreamObserver<Stat> responseObserver)54         public void store(Monster request, io.grpc.stub.StreamObserver<Stat> responseObserver) {
55             Assert.assertEquals(request.name(), BIG_MONSTER_NAME);
56             Assert.assertEquals(request.hp(), nestedMonsterHp);
57             Assert.assertEquals(request.mana(), nestedMonsterMana);
58             System.out.println("Received store request from " + request.name());
59             // Create a response from the incoming request name.
60             Stat stat = GameFactory.createStat("Hello " + request.name(), 100, 10);
61             responseObserver.onNext(stat);
62             responseObserver.onCompleted();
63         }
64 
65         @Override
retrieve(Stat request, io.grpc.stub.StreamObserver<Monster> responseObserver)66         public void retrieve(Stat request, io.grpc.stub.StreamObserver<Monster> responseObserver) {
67             // Create 10 monsters for streaming response.
68             for (int i=0; i<numStreamedMsgs; i++) {
69                 Monster monster = GameFactory.createMonsterFromStat(request, i);
70                 responseObserver.onNext(monster);
71             }
72             responseObserver.onCompleted();
73         }
74 
75         @Override
getMaxHitPoint(final StreamObserver<Stat> responseObserver)76         public StreamObserver<Monster> getMaxHitPoint(final StreamObserver<Stat> responseObserver) {
77           return computeMinMax(responseObserver, false);
78         }
79 
80         @Override
getMinMaxHitPoints(final StreamObserver<Stat> responseObserver)81         public StreamObserver<Monster> getMinMaxHitPoints(final StreamObserver<Stat> responseObserver) {
82           return computeMinMax(responseObserver, true);
83         }
84 
computeMinMax(final StreamObserver<Stat> responseObserver, final boolean includeMin)85         private StreamObserver<Monster> computeMinMax(final StreamObserver<Stat> responseObserver, final boolean includeMin) {
86           final AtomicInteger maxHp = new AtomicInteger(Integer.MIN_VALUE);
87           final AtomicReference<String> maxHpMonsterName = new AtomicReference<String>();
88           final AtomicInteger maxHpCount = new AtomicInteger();
89 
90           final AtomicInteger minHp = new AtomicInteger(Integer.MAX_VALUE);
91           final AtomicReference<String> minHpMonsterName = new AtomicReference<String>();
92           final AtomicInteger minHpCount = new AtomicInteger();
93 
94           return new StreamObserver<Monster>() {
95             public void onNext(Monster monster) {
96               if (monster.hp() > maxHp.get()) {
97                 // Found a monster of higher hit points.
98                 maxHp.set(monster.hp());
99                 maxHpMonsterName.set(monster.name());
100                 maxHpCount.set(1);
101               }
102               else if (monster.hp() == maxHp.get()) {
103                 // Count how many times we saw a monster of current max hit points.
104                 maxHpCount.getAndIncrement();
105               }
106 
107               if (monster.hp() < minHp.get()) {
108                 // Found a monster of a lower hit points.
109                 minHp.set(monster.hp());
110                 minHpMonsterName.set(monster.name());
111                 minHpCount.set(1);
112               }
113               else if (monster.hp() == minHp.get()) {
114                 // Count how many times we saw a monster of current min hit points.
115                 minHpCount.getAndIncrement();
116               }
117             }
118             public void onCompleted() {
119               Stat maxHpStat = GameFactory.createStat(maxHpMonsterName.get(), maxHp.get(), maxHpCount.get());
120               // Send max hit points first.
121               responseObserver.onNext(maxHpStat);
122               if (includeMin) {
123                 // Send min hit points.
124                 Stat minHpStat = GameFactory.createStat(minHpMonsterName.get(), minHp.get(), minHpCount.get());
125                 responseObserver.onNext(minHpStat);
126               }
127               responseObserver.onCompleted();
128             }
129             public void onError(Throwable t) {
130               // Not expected
131               Assert.fail();
132             };
133           };
134         }
135     }
136 
137     @org.junit.BeforeClass
138     public static void startServer() throws IOException {
139         server = ServerBuilder.forPort(0).addService(new MyService()).build().start();
140         int port = server.getPort();
141         channel = ManagedChannelBuilder.forAddress("localhost", port)
142                 // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
143                 // needing certificates.
144                 .usePlaintext(true)
145                 .directExecutor()
146                 .build();
147         blockingStub = MonsterStorageGrpc.newBlockingStub(channel);
148         asyncStub = MonsterStorageGrpc.newStub(channel);
149     }
150 
151     @org.junit.Test
152     public void testUnary() throws IOException {
153         Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
154         Stat stat = blockingStub.store(monsterRequest);
155         Assert.assertEquals(stat.id(), "Hello " + BIG_MONSTER_NAME);
156         System.out.println("Received stat response from service: " + stat.id());
157     }
158 
159     @org.junit.Test
160     public void testServerStreaming() throws IOException {
161         Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana);
162         Stat stat = blockingStub.store(monsterRequest);
163         Iterator<Monster> iterator = blockingStub.retrieve(stat);
164         int counter = 0;
165         while(iterator.hasNext()) {
166             Monster m = iterator.next();
167             System.out.println("Received monster " + m.name());
168             counter ++;
169         }
170         Assert.assertEquals(counter, numStreamedMsgs);
171         System.out.println("FlatBuffers GRPC client/server test: completed successfully");
172     }
173 
174     @org.junit.Test
175     public void testClientStreaming() throws IOException, InterruptedException {
176       final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
177       final CountDownLatch streamAlive = new CountDownLatch(1);
178 
179       StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
180         public void onCompleted() {
181           streamAlive.countDown();
182         }
183         public void onError(Throwable ex) { }
184         public void onNext(Stat stat) {
185           maxHitStat.set(stat);
186         }
187       };
188       StreamObserver<Monster> monsterStream = asyncStub.getMaxHitPoint(statObserver);
189       short count = 10;
190       for (short i = 0;i < count; ++i) {
191         Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
192         monsterStream.onNext(monster);
193       }
194       monsterStream.onCompleted();
195       // Wait a little bit for the server to send the stats of the monster with the max hit-points.
196       streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
197       Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
198       Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
199       Assert.assertEquals(maxHitStat.get().count(), 1);
200     }
201 
202     @org.junit.Test
203     public void testBiDiStreaming() throws IOException, InterruptedException {
204       final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>();
205       final AtomicReference<Stat> minHitStat = new AtomicReference<Stat>();
206       final CountDownLatch streamAlive = new CountDownLatch(1);
207 
208       StreamObserver<Stat> statObserver = new StreamObserver<Stat>() {
209         public void onCompleted() {
210           streamAlive.countDown();
211         }
212         public void onError(Throwable ex) { }
213         public void onNext(Stat stat) {
214           // We expect the server to send the max stat first and then the min stat.
215           if (maxHitStat.get() == null) {
216             maxHitStat.set(stat);
217           }
218           else {
219             minHitStat.set(stat);
220           }
221         }
222       };
223       StreamObserver<Monster> monsterStream = asyncStub.getMinMaxHitPoints(statObserver);
224       short count = 10;
225       for (short i = 0;i < count; ++i) {
226         Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana);
227         monsterStream.onNext(monster);
228       }
229       monsterStream.onCompleted();
230 
231       // Wait a little bit for the server to send the stats of the monster with the max and min hit-points.
232       streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS);
233 
234       Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1));
235       Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1));
236       Assert.assertEquals(maxHitStat.get().count(), 1);
237 
238       Assert.assertEquals(minHitStat.get().id(), BIG_MONSTER_NAME + 0);
239       Assert.assertEquals(minHitStat.get().val(), nestedMonsterHp * 0);
240       Assert.assertEquals(minHitStat.get().count(), 1);
241     }
242 }
243