use super::plumbing::*;
use super::*;
use std::cell::Cell;
use std::sync::atomic::{AtomicUsize, Ordering};

#[cfg(test)]
mod test;

// The key optimization for find_first is that a consumer can stop its search if
// some consumer to its left already found a match (and similarly for consumers
// to the right for find_last). To make this work, all consumers need some
// notion of their position in the data relative to other consumers, including
// unindexed consumers that have no built-in notion of position.
//
// To solve this, we assign each consumer a lower and upper bound for an
// imaginary "range" of data that it consumes. The initial consumer starts with
// the range 0..usize::max_value(). The split divides this range in half so that
// one resulting consumer has the range 0..(usize::max_value() / 2), and the
// other has (usize::max_value() / 2)..usize::max_value(). Every subsequent
// split divides the range in half again until it cannot be split anymore
// (i.e. its length is 1), in which case the split returns two consumers with
// the same range. In that case both consumers will continue to consume all
// their data regardless of whether a better match is found, but the reducer
// will still return the correct answer.

#[derive(Copy, Clone)]
enum MatchPosition {
    Leftmost,
    Rightmost,
}

/// Returns true if pos1 is a better match than pos2 according to MatchPosition
#[inline]
fn better_position(pos1: usize, pos2: usize, mp: MatchPosition) -> bool {
    match mp {
        MatchPosition::Leftmost => pos1 < pos2,
        MatchPosition::Rightmost => pos1 > pos2,
    }
}

pub(super) fn find_first<I, P>(pi: I, find_op: P) -> Option<I::Item>
where
    I: ParallelIterator,
    P: Fn(&I::Item) -> bool + Sync,
{
    let best_found = AtomicUsize::new(usize::max_value());
    let consumer = FindConsumer::new(&find_op, MatchPosition::Leftmost, &best_found);
    pi.drive_unindexed(consumer)
}

pub(super) fn find_last<I, P>(pi: I, find_op: P) -> Option<I::Item>
where
    I: ParallelIterator,
    P: Fn(&I::Item) -> bool + Sync,
{
    let best_found = AtomicUsize::new(0);
    let consumer = FindConsumer::new(&find_op, MatchPosition::Rightmost, &best_found);
    pi.drive_unindexed(consumer)
}

struct FindConsumer<'p, P> {
    find_op: &'p P,
    lower_bound: Cell<usize>,
    upper_bound: usize,
    match_position: MatchPosition,
    best_found: &'p AtomicUsize,
}

impl<'p, P> FindConsumer<'p, P> {
    fn new(find_op: &'p P, match_position: MatchPosition, best_found: &'p AtomicUsize) -> Self {
        FindConsumer {
            find_op,
            lower_bound: Cell::new(0),
            upper_bound: usize::max_value(),
            match_position,
            best_found,
        }
    }

    fn current_index(&self) -> usize {
        match self.match_position {
            MatchPosition::Leftmost => self.lower_bound.get(),
            MatchPosition::Rightmost => self.upper_bound,
        }
    }
}

impl<'p, T, P> Consumer<T> for FindConsumer<'p, P>
where
    T: Send,
    P: Fn(&T) -> bool + Sync,
{
    type Folder = FindFolder<'p, T, P>;
    type Reducer = FindReducer;
    type Result = Option<T>;

    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
        let dir = self.match_position;
        (
            self.split_off_left(),
            self,
            FindReducer {
                match_position: dir,
            },
        )
    }

    fn into_folder(self) -> Self::Folder {
        FindFolder {
            find_op: self.find_op,
            boundary: self.current_index(),
            match_position: self.match_position,
            best_found: self.best_found,
            item: None,
        }
    }

    fn full(&self) -> bool {
        // can stop consuming if the best found index so far is *strictly*
        // better than anything this consumer will find
        better_position(
            self.best_found.load(Ordering::Relaxed),
            self.current_index(),
            self.match_position,
        )
    }
}

impl<'p, T, P> UnindexedConsumer<T> for FindConsumer<'p, P>
where
    T: Send,
    P: Fn(&T) -> bool + Sync,
{
    fn split_off_left(&self) -> Self {
        // Upper bound for one consumer will be lower bound for the other. This
        // overlap is okay, because only one of the bounds will be used for
        // comparing against best_found; the other is kept only to be able to
        // divide the range in half.
        //
        // When the resolution of usize has been exhausted (i.e. when
        // upper_bound = lower_bound), both results of this split will have the
        // same range. When that happens, we lose the ability to tell one
        // consumer to stop working when the other finds a better match, but the
        // reducer ensures that the best answer is still returned (see the test
        // above).
        let old_lower_bound = self.lower_bound.get();
        let median = old_lower_bound + ((self.upper_bound - old_lower_bound) / 2);
        self.lower_bound.set(median);

        FindConsumer {
            find_op: self.find_op,
            lower_bound: Cell::new(old_lower_bound),
            upper_bound: median,
            match_position: self.match_position,
            best_found: self.best_found,
        }
    }

    fn to_reducer(&self) -> Self::Reducer {
        FindReducer {
            match_position: self.match_position,
        }
    }
}

struct FindFolder<'p, T, P> {
    find_op: &'p P,
    boundary: usize,
    match_position: MatchPosition,
    best_found: &'p AtomicUsize,
    item: Option<T>,
}

impl<'p, P: 'p + Fn(&T) -> bool, T> Folder<T> for FindFolder<'p, T, P> {
    type Result = Option<T>;

    fn consume(mut self, item: T) -> Self {
        let found_best_in_range = match self.match_position {
            MatchPosition::Leftmost => self.item.is_some(),
            MatchPosition::Rightmost => false,
        };

        if !found_best_in_range && (self.find_op)(&item) {
            // Continuously try to set best_found until we succeed or we
            // discover a better match was already found.
            let mut current = self.best_found.load(Ordering::Relaxed);
            loop {
                if better_position(current, self.boundary, self.match_position) {
                    break;
                }
                match self.best_found.compare_exchange_weak(
                    current,
                    self.boundary,
                    Ordering::Relaxed,
                    Ordering::Relaxed,
                ) {
                    Ok(_) => {
                        self.item = Some(item);
                        break;
                    }
                    Err(v) => current = v,
                }
            }
        }
        self
    }

    fn complete(self) -> Self::Result {
        self.item
    }

    fn full(&self) -> bool {
        let found_best_in_range = match self.match_position {
            MatchPosition::Leftmost => self.item.is_some(),
            MatchPosition::Rightmost => false,
        };

        found_best_in_range
            || better_position(
                self.best_found.load(Ordering::Relaxed),
                self.boundary,
                self.match_position,
            )
    }
}

struct FindReducer {
    match_position: MatchPosition,
}

impl<T> Reducer<Option<T>> for FindReducer {
    fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> {
        match self.match_position {
            MatchPosition::Leftmost => left.or(right),
            MatchPosition::Rightmost => right.or(left),
        }
    }
}