1 /*
<lambda>null2  * Copyright 2018 Google Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package trebuchet.util
18 
19 import java.io.Closeable
20 import java.util.*
21 import java.util.concurrent.ArrayBlockingQueue
22 import java.util.concurrent.Callable
23 import java.util.concurrent.CompletableFuture
24 import java.util.concurrent.Executors
25 import java.util.concurrent.Future
26 import java.util.concurrent.atomic.AtomicBoolean
27 import java.util.concurrent.atomic.AtomicLong
28 import kotlin.collections.ArrayList
29 import kotlin.concurrent.thread
30 import kotlin.system.measureNanoTime
31 import kotlin.system.measureTimeMillis
32 
33 fun ns2ms(ns: Long) = ns / 1000000
34 
35 class WorkQueue {
36     private val workArray = ArrayDeque<Runnable?>(8192)
37     private val _finished = AtomicBoolean(false)
38 
39     private val finished get() = _finished.get()
40     fun finish() = _finished.set(true)
41 
42     private fun spinLoop(duration: Long) {
43         val loopUntil = System.nanoTime() + duration
44         while (System.nanoTime() < loopUntil) {}
45     }
46 
47     fun submit(task: Runnable) {
48         synchronized(workArray) {
49             workArray.add(task)
50         }
51     }
52 
53     fun processAll() {
54         while (!finished) {
55             var next: Runnable?
56             do {
57                 synchronized(workArray) {
58                     next = workArray.poll()
59                 }
60                 if (next == null) {
61                     spinLoop(10000)
62                     synchronized(workArray) {
63                         next = workArray.poll()
64                     }
65                 }
66                 next?.run()
67             } while (next != null)
68             spinLoop(60000)
69         }
70 
71         var remaining: Runnable?
72         do {
73             synchronized(workArray) {
74                 remaining = workArray.poll()
75             }
76             remaining?.run()
77         } while (remaining != null)
78     }
79 }
80 
81 private val ThreadCount = minOf(10, Runtime.getRuntime().availableProcessors())
82 
83 class WorkPool : Closeable {
84 
85     private var closed = false
86     private val workQueue = WorkQueue()
<lambda>null87     private val threadPool = Array(ThreadCount) {
88         Thread { workQueue.processAll() }.apply { start() }
89     }
90 
submitnull91     fun submit(task: Runnable) {
92         workQueue.submit(task)
93     }
94 
closenull95     override fun close() {
96         if (closed) return
97         closed = true
98         workQueue.finish()
99         threadPool.forEach { it.join() }
100     }
101 }
102 
par_mapnull103 fun<T, U, S> par_map(iterator: Iterator<T>, threadState: () -> S, chunkSize: Int = 50,
104                      map: (S, T) -> U): Iterator<U> {
105     val endOfStreamMarker = CompletableFuture<List<U>>()
106     val resultPipe = ArrayBlockingQueue<Future<List<U>>>(1024, false)
107     thread {
108         val threadLocal = ThreadLocal.withInitial { threadState() }
109         WorkPool().use { pool ->
110             while (iterator.hasNext()) {
111                 val source = ArrayList<T>(chunkSize)
112                 while (source.size < chunkSize && iterator.hasNext()) {
113                     source.add(iterator.next())
114                 }
115                 val future = CompletableFuture<List<U>>()
116                 pool.submit(Runnable {
117                     val state = threadLocal.get()
118                     future.complete(source.map { map(state, it) })
119                 })
120                 resultPipe.put(future)
121             }
122             resultPipe.put(endOfStreamMarker)
123         }
124     }
125 
126     return object : Iterator<U> {
127         var current: Iterator<U>? = null
128 
129         init {
130             takeNext()
131         }
132 
133         private fun takeNext() {
134             val next = resultPipe.take()
135             if (next != endOfStreamMarker) {
136                 current = next.get().iterator()
137             }
138         }
139 
140         override fun next(): U {
141             val ret = current!!.next()
142             if (!current!!.hasNext()) {
143                 current = null
144                 takeNext()
145             }
146             return ret
147         }
148 
149         override fun hasNext(): Boolean {
150             return current != null
151         }
152 
153     }
154 }