1 /* 2 * Copyright 2014 Google Inc. All rights reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 import MyGame.Example.Monster; 18 import MyGame.Example.MonsterStorageGrpc; 19 import MyGame.Example.Stat; 20 import com.google.flatbuffers.FlatBufferBuilder; 21 import io.grpc.ManagedChannel; 22 import io.grpc.ManagedChannelBuilder; 23 import io.grpc.Server; 24 import io.grpc.ServerBuilder; 25 import io.grpc.stub.StreamObserver; 26 import org.junit.Assert; 27 28 import java.io.IOException; 29 import java.lang.InterruptedException; 30 import java.nio.ByteBuffer; 31 import java.util.Iterator; 32 import java.util.concurrent.TimeUnit; 33 import java.util.concurrent.atomic.AtomicReference; 34 import java.util.concurrent.atomic.AtomicInteger; 35 import java.util.concurrent.CountDownLatch; 36 37 38 /** 39 * Demonstrates basic client-server interaction using grpc-java over netty. 40 */ 41 public class JavaGrpcTest { 42 static final String BIG_MONSTER_NAME = "Cyberdemon"; 43 static final short nestedMonsterHp = 600; 44 static final short nestedMonsterMana = 1024; 45 static final int numStreamedMsgs = 10; 46 static final int timeoutMs = 3000; 47 static Server server; 48 static ManagedChannel channel; 49 static MonsterStorageGrpc.MonsterStorageBlockingStub blockingStub; 50 static MonsterStorageGrpc.MonsterStorageStub asyncStub; 51 52 static class MyService extends MonsterStorageGrpc.MonsterStorageImplBase { 53 @Override store(Monster request, io.grpc.stub.StreamObserver<Stat> responseObserver)54 public void store(Monster request, io.grpc.stub.StreamObserver<Stat> responseObserver) { 55 Assert.assertEquals(request.name(), BIG_MONSTER_NAME); 56 Assert.assertEquals(request.hp(), nestedMonsterHp); 57 Assert.assertEquals(request.mana(), nestedMonsterMana); 58 System.out.println("Received store request from " + request.name()); 59 // Create a response from the incoming request name. 60 Stat stat = GameFactory.createStat("Hello " + request.name(), 100, 10); 61 responseObserver.onNext(stat); 62 responseObserver.onCompleted(); 63 } 64 65 @Override retrieve(Stat request, io.grpc.stub.StreamObserver<Monster> responseObserver)66 public void retrieve(Stat request, io.grpc.stub.StreamObserver<Monster> responseObserver) { 67 // Create 10 monsters for streaming response. 68 for (int i=0; i<numStreamedMsgs; i++) { 69 Monster monster = GameFactory.createMonsterFromStat(request, i); 70 responseObserver.onNext(monster); 71 } 72 responseObserver.onCompleted(); 73 } 74 75 @Override getMaxHitPoint(final StreamObserver<Stat> responseObserver)76 public StreamObserver<Monster> getMaxHitPoint(final StreamObserver<Stat> responseObserver) { 77 return computeMinMax(responseObserver, false); 78 } 79 80 @Override getMinMaxHitPoints(final StreamObserver<Stat> responseObserver)81 public StreamObserver<Monster> getMinMaxHitPoints(final StreamObserver<Stat> responseObserver) { 82 return computeMinMax(responseObserver, true); 83 } 84 computeMinMax(final StreamObserver<Stat> responseObserver, final boolean includeMin)85 private StreamObserver<Monster> computeMinMax(final StreamObserver<Stat> responseObserver, final boolean includeMin) { 86 final AtomicInteger maxHp = new AtomicInteger(Integer.MIN_VALUE); 87 final AtomicReference<String> maxHpMonsterName = new AtomicReference<String>(); 88 final AtomicInteger maxHpCount = new AtomicInteger(); 89 90 final AtomicInteger minHp = new AtomicInteger(Integer.MAX_VALUE); 91 final AtomicReference<String> minHpMonsterName = new AtomicReference<String>(); 92 final AtomicInteger minHpCount = new AtomicInteger(); 93 94 return new StreamObserver<Monster>() { 95 public void onNext(Monster monster) { 96 if (monster.hp() > maxHp.get()) { 97 // Found a monster of higher hit points. 98 maxHp.set(monster.hp()); 99 maxHpMonsterName.set(monster.name()); 100 maxHpCount.set(1); 101 } 102 else if (monster.hp() == maxHp.get()) { 103 // Count how many times we saw a monster of current max hit points. 104 maxHpCount.getAndIncrement(); 105 } 106 107 if (monster.hp() < minHp.get()) { 108 // Found a monster of a lower hit points. 109 minHp.set(monster.hp()); 110 minHpMonsterName.set(monster.name()); 111 minHpCount.set(1); 112 } 113 else if (monster.hp() == minHp.get()) { 114 // Count how many times we saw a monster of current min hit points. 115 minHpCount.getAndIncrement(); 116 } 117 } 118 public void onCompleted() { 119 Stat maxHpStat = GameFactory.createStat(maxHpMonsterName.get(), maxHp.get(), maxHpCount.get()); 120 // Send max hit points first. 121 responseObserver.onNext(maxHpStat); 122 if (includeMin) { 123 // Send min hit points. 124 Stat minHpStat = GameFactory.createStat(minHpMonsterName.get(), minHp.get(), minHpCount.get()); 125 responseObserver.onNext(minHpStat); 126 } 127 responseObserver.onCompleted(); 128 } 129 public void onError(Throwable t) { 130 // Not expected 131 Assert.fail(); 132 }; 133 }; 134 } 135 } 136 137 @org.junit.BeforeClass 138 public static void startServer() throws IOException { 139 server = ServerBuilder.forPort(0).addService(new MyService()).build().start(); 140 int port = server.getPort(); 141 channel = ManagedChannelBuilder.forAddress("localhost", port) 142 // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid 143 // needing certificates. 144 .usePlaintext(true) 145 .directExecutor() 146 .build(); 147 blockingStub = MonsterStorageGrpc.newBlockingStub(channel); 148 asyncStub = MonsterStorageGrpc.newStub(channel); 149 } 150 151 @org.junit.Test 152 public void testUnary() throws IOException { 153 Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana); 154 Stat stat = blockingStub.store(monsterRequest); 155 Assert.assertEquals(stat.id(), "Hello " + BIG_MONSTER_NAME); 156 System.out.println("Received stat response from service: " + stat.id()); 157 } 158 159 @org.junit.Test 160 public void testServerStreaming() throws IOException { 161 Monster monsterRequest = GameFactory.createMonster(BIG_MONSTER_NAME, nestedMonsterHp, nestedMonsterMana); 162 Stat stat = blockingStub.store(monsterRequest); 163 Iterator<Monster> iterator = blockingStub.retrieve(stat); 164 int counter = 0; 165 while(iterator.hasNext()) { 166 Monster m = iterator.next(); 167 System.out.println("Received monster " + m.name()); 168 counter ++; 169 } 170 Assert.assertEquals(counter, numStreamedMsgs); 171 System.out.println("FlatBuffers GRPC client/server test: completed successfully"); 172 } 173 174 @org.junit.Test 175 public void testClientStreaming() throws IOException, InterruptedException { 176 final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>(); 177 final CountDownLatch streamAlive = new CountDownLatch(1); 178 179 StreamObserver<Stat> statObserver = new StreamObserver<Stat>() { 180 public void onCompleted() { 181 streamAlive.countDown(); 182 } 183 public void onError(Throwable ex) { } 184 public void onNext(Stat stat) { 185 maxHitStat.set(stat); 186 } 187 }; 188 StreamObserver<Monster> monsterStream = asyncStub.getMaxHitPoint(statObserver); 189 short count = 10; 190 for (short i = 0;i < count; ++i) { 191 Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana); 192 monsterStream.onNext(monster); 193 } 194 monsterStream.onCompleted(); 195 // Wait a little bit for the server to send the stats of the monster with the max hit-points. 196 streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS); 197 Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1)); 198 Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1)); 199 Assert.assertEquals(maxHitStat.get().count(), 1); 200 } 201 202 @org.junit.Test 203 public void testBiDiStreaming() throws IOException, InterruptedException { 204 final AtomicReference<Stat> maxHitStat = new AtomicReference<Stat>(); 205 final AtomicReference<Stat> minHitStat = new AtomicReference<Stat>(); 206 final CountDownLatch streamAlive = new CountDownLatch(1); 207 208 StreamObserver<Stat> statObserver = new StreamObserver<Stat>() { 209 public void onCompleted() { 210 streamAlive.countDown(); 211 } 212 public void onError(Throwable ex) { } 213 public void onNext(Stat stat) { 214 // We expect the server to send the max stat first and then the min stat. 215 if (maxHitStat.get() == null) { 216 maxHitStat.set(stat); 217 } 218 else { 219 minHitStat.set(stat); 220 } 221 } 222 }; 223 StreamObserver<Monster> monsterStream = asyncStub.getMinMaxHitPoints(statObserver); 224 short count = 10; 225 for (short i = 0;i < count; ++i) { 226 Monster monster = GameFactory.createMonster(BIG_MONSTER_NAME + i, (short) (nestedMonsterHp * i), nestedMonsterMana); 227 monsterStream.onNext(monster); 228 } 229 monsterStream.onCompleted(); 230 231 // Wait a little bit for the server to send the stats of the monster with the max and min hit-points. 232 streamAlive.await(timeoutMs, TimeUnit.MILLISECONDS); 233 234 Assert.assertEquals(maxHitStat.get().id(), BIG_MONSTER_NAME + (count - 1)); 235 Assert.assertEquals(maxHitStat.get().val(), nestedMonsterHp * (count - 1)); 236 Assert.assertEquals(maxHitStat.get().count(), 1); 237 238 Assert.assertEquals(minHitStat.get().id(), BIG_MONSTER_NAME + 0); 239 Assert.assertEquals(minHitStat.get().val(), nestedMonsterHp * 0); 240 Assert.assertEquals(minHitStat.get().count(), 1); 241 } 242 } 243