// Copyright 2023 Google LLC
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Modified from BSD-licensed code
// Copyright (c) the JPEG XL Project Authors. All rights reserved.
// See https://github.com/libjxl/libjxl/blob/main/LICENSE.

#ifndef HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_
#define HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_

#include <stddef.h>
#include <stdint.h>
#include <stdio.h>  // snprintf

#include <array>
#include <atomic>
#include <string>
#include <thread>  // NOLINT
#include <vector>

#include "hwy/aligned_allocator.h"  // HWY_ALIGNMENT
#include "hwy/auto_tune.h"
#include "hwy/base.h"
#include "hwy/cache_control.h"  // Pause
#include "hwy/contrib/thread_pool/futex.h"
#include "hwy/contrib/thread_pool/spin.h"
#include "hwy/contrib/thread_pool/topology.h"
#include "hwy/profiler.h"
#include "hwy/stats.h"
#include "hwy/timer.h"

// Define to HWY_NOINLINE to see profiles of `WorkerRun*` and waits.
#define HWY_POOL_PROFILE

namespace hwy {

// Sets the name of the current thread to the format string `format`, which must
// include %d for `thread`. Currently only implemented for pthreads (*nix and
// OSX); Windows involves throwing an exception.
static inline void SetThreadName(const char* format, int thread) {
  char buf[16] = {};  // Linux limit, including \0
  const int chars_written = snprintf(buf, sizeof(buf), format, thread);
  HWY_ASSERT(0 < chars_written &&
             chars_written <= static_cast<int>(sizeof(buf) - 1));

#if (HWY_OS_LINUX && (!defined(__ANDROID__) || __ANDROID_API__ >= 19)) || \
    HWY_OS_FREEBSD
  // Note that FreeBSD pthread_set_name_np does not return a value (#2669).
  HWY_ASSERT(0 == pthread_setname_np(pthread_self(), buf));
#elif HWY_OS_APPLE
  // Different interface: single argument, current thread only.
  HWY_ASSERT(0 == pthread_setname_np(buf));
#endif
}

// Whether workers should block or spin.
enum class PoolWaitMode : uint8_t { kBlock = 1, kSpin };

namespace pool {

#ifndef HWY_POOL_VERBOSITY
#define HWY_POOL_VERBOSITY 0
#endif

static constexpr int kVerbosity = HWY_POOL_VERBOSITY;

// Some CPUs already have more than this many threads, but rather than one
// large pool, we assume applications create multiple pools, ideally per
// cluster (cores sharing a cache), because this improves locality and barrier
// latency. In that case, this is a generous upper bound.
static constexpr size_t kMaxThreads = 63;

// Generates a random permutation of [0, size). O(1) storage.
class ShuffledIota {
 public:
  ShuffledIota() : coprime_(1) {}  // for Worker
  explicit ShuffledIota(uint32_t coprime) : coprime_(coprime) {}

  // Returns the next after `current`, using an LCG-like generator.
  uint32_t Next(uint32_t current, const Divisor64& divisor) const {
    HWY_DASSERT(current < divisor.GetDivisor());
    // (coprime * i + current) % size, see https://lemire.me/blog/2017/09/18/.
    return static_cast<uint32_t>(divisor.Remainder(current + coprime_));
  }

  // Returns true if a and b have no common denominator except 1. Based on
  // binary GCD. Assumes a and b are nonzero. Also used in tests.
  static bool CoprimeNonzero(uint32_t a, uint32_t b) {
    const size_t trailing_a = Num0BitsBelowLS1Bit_Nonzero32(a);
    const size_t trailing_b = Num0BitsBelowLS1Bit_Nonzero32(b);
    // If both have at least one trailing zero, they are both divisible by 2.
    if (HWY_MIN(trailing_a, trailing_b) != 0) return false;

    // If one of them has a trailing zero, shift it out.
    a >>= trailing_a;
    b >>= trailing_b;

    for (;;) {
      // Swap such that a >= b.
      const uint32_t tmp_a = a;
      a = HWY_MAX(tmp_a, b);
      b = HWY_MIN(tmp_a, b);

      // When the smaller number is 1, they were coprime.
      if (b == 1) return true;

      a -= b;
      // a == b means there was a common factor, so not coprime.
      if (a == 0) return false;
      a >>= Num0BitsBelowLS1Bit_Nonzero32(a);
    }
  }

  // Returns another coprime >= `start`, or 1 for small `size`.
  // Used to seed independent ShuffledIota instances.
  static uint32_t FindAnotherCoprime(uint32_t size, uint32_t start) {
    if (size <= 2) {
      return 1;
    }

    // Avoids even x for even sizes, which are sure to be rejected.
    const uint32_t inc = (size & 1) ? 1 : 2;

    for (uint32_t x = start | 1; x < start + size * 16; x += inc) {
      if (CoprimeNonzero(x, static_cast<uint32_t>(size))) {
        return x;
      }
    }

    HWY_UNREACHABLE;
  }

  uint32_t coprime_;
};

// 'Policies' suitable for various worker counts and locality. To define a
// new class, add an enum and update `ToString` plus `FunctorAddWait`. The
// enumerators must be contiguous so we can iterate over them.
enum class WaitType : uint8_t {
  kBlock,
  kSpin1,
  kSpinSeparate,
  kSentinel  // Must be last.
};
enum class BarrierType : uint8_t {
  kOrdered,
  kCounter1,
  kCounter2,
  kCounter4,
  kGroup2,
  kGroup4,
  kSentinel  // Must be last.
};

// For printing which is in use.
static inline const char* ToString(WaitType type) {
  switch (type) {
    case WaitType::kBlock:
      return "Block";
    case WaitType::kSpin1:
      return "Single";
    case WaitType::kSpinSeparate:
      return "Separate";
    case WaitType::kSentinel:
      return nullptr;
    default:
      HWY_UNREACHABLE;
  }
}

static inline const char* ToString(BarrierType type) {
  switch (type) {
    case BarrierType::kOrdered:
      return "Ordered";
    case BarrierType::kCounter1:
      return "Counter1";
    case BarrierType::kCounter2:
      return "Counter2";
    case BarrierType::kCounter4:
      return "Counter4";
    case BarrierType::kGroup2:
      return "Group2";
    case BarrierType::kGroup4:
      return "Group4";
    case BarrierType::kSentinel:
      return nullptr;
    default:
      HWY_UNREACHABLE;
  }
}

// We want predictable struct/class sizes so we can reason about cache lines.
#pragma pack(push, 1)

// Parameters governing the main and worker thread behavior. Can be updated at
// runtime via `SetWaitMode`. Both have copies which are carefully synchronized
// (two-phase barrier). 64-bit allows adding fields (e.g. for load-balancing)
// without having to bit-pack members, and is fine because this is only moved
// with relaxed stores, hence we do not have to fit it in the 32 futex bits.
class Config {  // 8 bytes
 public:
  static std::vector<Config> AllCandidates(PoolWaitMode wait_mode,
                                           size_t num_threads) {
    std::vector<SpinType> spin_types(size_t{1}, DetectSpin());
    // Monitor-based spin may be slower, so also try Pause.
    if (spin_types[0] != SpinType::kPause) {
      spin_types.push_back(SpinType::kPause);
    }

    std::vector<WaitType> wait_types;
    if (wait_mode == PoolWaitMode::kSpin) {
      // All except `kBlock`.
      for (size_t wait = 0;; ++wait) {
        const WaitType wait_type = static_cast<WaitType>(wait);
        if (wait_type == WaitType::kSentinel) break;
        if (wait_type != WaitType::kBlock) wait_types.push_back(wait_type);
      }
    } else {
      wait_types.push_back(WaitType::kBlock);
    }

    std::vector<BarrierType> barrier_types;
    // Note that casting an integer is UB if there is no matching enumerator,
    // but we define a sentinel to prevent this.
    for (size_t barrier = 0;; ++barrier) {
      const BarrierType barrier_type = static_cast<BarrierType>(barrier);
      if (barrier_type == BarrierType::kSentinel) break;
      // If <= 2 workers, group size of 4 is the same as 2.
      if (num_threads <= 1 && barrier_type == BarrierType::kCounter4) continue;
      if (num_threads <= 1 && barrier_type == BarrierType::kGroup4) continue;
      barrier_types.push_back(barrier_type);
    }

    std::vector<Config> candidates;
    candidates.reserve(50);
    for (const SpinType spin_type : spin_types) {
      for (const WaitType wait_type : wait_types) {
        for (const BarrierType barrier_type : barrier_types) {
          candidates.emplace_back(spin_type, wait_type, barrier_type);
        }
      }
    }
    return candidates;
  }

  std::string ToString() const {
    char buf[128];
    snprintf(buf, sizeof(buf), "%14s %9s %9s", hwy::ToString(spin_type),
             pool::ToString(wait_type), pool::ToString(barrier_type));
    return buf;
  }

  Config() {}
  Config(SpinType spin_type, WaitType wait_type, BarrierType barrier_type)
      : spin_type(spin_type),
        wait_type(wait_type),
        barrier_type(barrier_type),
        exit(false) {}

  SpinType spin_type;
  WaitType wait_type;
  BarrierType barrier_type;
  bool exit;
  uint32_t reserved = 0;
};
static_assert(sizeof(Config) == 8, "");

// Per-worker state used by both main and worker threads. `ThreadFunc`
// (threads) and `ThreadPool` (main) have a few additional members of their own.
class alignas(HWY_ALIGNMENT) Worker {  // HWY_ALIGNMENT bytes
  static constexpr size_t kMaxVictims = 4;

  static constexpr auto kAcq = std::memory_order_acquire;
  static constexpr auto kRel = std::memory_order_release;

 public:
  Worker(const size_t worker, const size_t num_threads,
         const Divisor64& div_workers)
      : worker_(worker), num_threads_(num_threads), workers_(this - worker) {
    (void)padding_;

    HWY_DASSERT(IsAligned(this, HWY_ALIGNMENT));
    HWY_DASSERT(worker <= num_threads);
    const size_t num_workers = static_cast<size_t>(div_workers.GetDivisor());
    num_victims_ = static_cast<uint32_t>(HWY_MIN(kMaxVictims, num_workers));

    // Increase gap between coprimes to reduce collisions.
    const uint32_t coprime = ShuffledIota::FindAnotherCoprime(
        static_cast<uint32_t>(num_workers),
        static_cast<uint32_t>((worker + 1) * 257 + worker * 13));
    const ShuffledIota shuffled_iota(coprime);

    // To simplify `WorkerRun`, this worker is the first to 'steal' from.
    victims_[0] = static_cast<uint32_t>(worker);
    for (uint32_t i = 1; i < num_victims_; ++i) {
      victims_[i] = shuffled_iota.Next(victims_[i - 1], div_workers);
      HWY_DASSERT(victims_[i] != worker);
    }
  }

  // Placement-newed by `WorkerLifecycle`, we do not expect any copying.
  Worker(const Worker&) = delete;
  Worker& operator=(const Worker&) = delete;

  size_t Index() const { return worker_; }
  Worker* AllWorkers() { return workers_; }
  const Worker* AllWorkers() const { return workers_; }
  size_t NumThreads() const { return num_threads_; }

  // ------------------------ Per-worker storage for `SendConfig`

  Config LatchedConfig() const { return latched_; }
  // For workers, but no harm if also called by main thread.
  void LatchConfig(Config copy) { latched_ = copy; }

  // ------------------------ Task assignment

  // Called from the main thread.
  void SetRange(const uint64_t begin, const uint64_t end) {
    my_begin_.store(begin, kRel);
    my_end_.store(end, kRel);
  }

  uint64_t MyEnd() const { return my_end_.load(kAcq); }

  Span<const uint32_t> Victims() const {
    return hwy::Span<const uint32_t>(victims_.data(),
                                     static_cast<size_t>(num_victims_));
  }

  // Returns the next task to execute. If >= MyEnd(), it must be skipped.
  uint64_t WorkerReserveTask() {
    // TODO(janwas): replace with cooperative work-stealing.
    return my_begin_.fetch_add(1, std::memory_order_relaxed);
  }

  // ------------------------ Waiter: Threads wait for tasks

  // WARNING: some `Wait*` do not set this for all Worker instances. For
  // example, `WaitType::kBlock` only uses the first worker's `Waiter` because
  // one futex can wake multiple waiters. Hence we never load this directly
  // without going through `Wait*` policy classes, and must ensure all threads
  // use the same wait mode.

  const std::atomic<uint32_t>& Waiter() const { return wait_epoch_; }
  std::atomic<uint32_t>& MutableWaiter() { return wait_epoch_; }  // futex
  void StoreWaiter(uint32_t epoch) { wait_epoch_.store(epoch, kRel); }

  // ------------------------ Barrier: Main thread waits for workers

  const std::atomic<uint32_t>& Barrier() const { return barrier_epoch_; }
  std::atomic<uint32_t>& MutableBarrier() { return barrier_epoch_; }
  void StoreBarrier(uint32_t epoch) { barrier_epoch_.store(epoch, kRel); }

 private:
  // Atomics first because arm7 clang otherwise makes them unaligned.

  // Set by `SetRange`:
  alignas(8) std::atomic<uint64_t> my_begin_;
  alignas(8) std::atomic<uint64_t> my_end_;

  // Use u32 to match futex.h.
  alignas(4) std::atomic<uint32_t> wait_epoch_{0};
  alignas(4) std::atomic<uint32_t> barrier_epoch_{0};  // is reset

  uint32_t num_victims_;  // <= kPoolMaxVictims
  std::array<uint32_t, kMaxVictims> victims_;

  // Written and read by the same thread, hence not atomic.
  Config latched_;

  const size_t worker_;
  const size_t num_threads_;
  Worker* const workers_;

  uint8_t padding_[HWY_ALIGNMENT - 64 - sizeof(victims_)];
};
static_assert(sizeof(Worker) == HWY_ALIGNMENT, "");

#pragma pack(pop)

// Creates/destroys `Worker` using preallocated storage. See comment at
// `ThreadPool::worker_bytes_` for why we do not dynamically allocate.
class WorkerLifecycle {  // 0 bytes
 public:
  // Placement new for `Worker` into `storage` because its ctor requires
  // the worker index. Returns array of all workers.
  static Worker* Init(uint8_t* storage, size_t num_threads,
                      const Divisor64& div_workers) {
    Worker* workers = new (storage) Worker(0, num_threads, div_workers);
    for (size_t worker = 1; worker <= num_threads; ++worker) {
      new (Addr(storage, worker)) Worker(worker, num_threads, div_workers);
      // Ensure pointer arithmetic is the same (will be used in Destroy).
      HWY_DASSERT(reinterpret_cast<uintptr_t>(workers + worker) ==
                  reinterpret_cast<uintptr_t>(Addr(storage, worker)));
    }

    // Publish non-atomic stores in `workers`.
    std::atomic_thread_fence(std::memory_order_release);

    return workers;
  }

  static void Destroy(Worker* workers, size_t num_threads) {
    for (size_t worker = 0; worker <= num_threads; ++worker) {
      workers[worker].~Worker();
    }
  }

 private:
  static uint8_t* Addr(uint8_t* storage, size_t worker) {
    return storage + worker * sizeof(Worker);
  }
};

#pragma pack(push, 1)
// Stores arguments to `Run`: the function and range of task indices. Set by
// the main thread, read by workers including the main thread.
class alignas(8) Tasks {
  static constexpr auto kAcq = std::memory_order_acquire;

  // Signature of the (internal) function called from workers(s) for each
  // `task` in the [`begin`, `end`) passed to Run(). Closures (lambdas) do not
  // receive the first argument, which points to the lambda object.
  typedef void (*RunFunc)(const void* opaque, uint64_t task, size_t worker);

 public:
  Tasks() { HWY_DASSERT(IsAligned(this, 8)); }

  template <class Closure>
  void Set(uint64_t begin, uint64_t end, const Closure& closure) {
    constexpr auto kRel = std::memory_order_release;
    // `TestTasks` and `SetWaitMode` call this with `begin == end`.
    HWY_DASSERT(begin <= end);
    begin_.store(begin, kRel);
    end_.store(end, kRel);
    func_.store(static_cast<RunFunc>(&CallClosure<Closure>), kRel);
    opaque_.store(reinterpret_cast<const void*>(&closure), kRel);
  }

  // Assigns workers their share of `[begin, end)`. Called from the main
  // thread; workers are initializing or spinning for a command.
  static void DivideRangeAmongWorkers(const uint64_t begin, const uint64_t end,
                                      const Divisor64& div_workers,
                                      Worker* workers) {
    const size_t num_workers = static_cast<size_t>(div_workers.GetDivisor());
    HWY_DASSERT(num_workers > 1);  // Else Run() runs on the main thread.
    HWY_DASSERT(begin <= end);
    const size_t num_tasks = static_cast<size_t>(end - begin);

    // Assigning all remainders to the last worker causes imbalance. We instead
    // give one more to each worker whose index is less. This may be zero when
    // called from `TestTasks`.
    const size_t min_tasks = static_cast<size_t>(div_workers.Divide(num_tasks));
    const size_t remainder =
        static_cast<size_t>(div_workers.Remainder(num_tasks));

    uint64_t my_begin = begin;
    for (size_t worker = 0; worker < num_workers; ++worker) {
      const uint64_t my_end = my_begin + min_tasks + (worker < remainder);
      workers[worker].SetRange(my_begin, my_end);
      my_begin = my_end;
    }
    HWY_DASSERT(my_begin == end);
  }

  // Runs the worker's assigned range of tasks, plus work stealing if needed.
  HWY_POOL_PROFILE void WorkerRun(Worker* worker) const {
    if (NumTasks() > worker->NumThreads() + 1) {
      WorkerRunWithStealing(worker);
    } else {
      WorkerRunSingle(worker->Index());
    }
  }

 private:
  // Special case for <= 1 task per worker, where stealing is unnecessary.
  void WorkerRunSingle(size_t worker) const {
    const uint64_t begin = begin_.load(kAcq);
    const uint64_t end = end_.load(kAcq);
    HWY_DASSERT(begin <= end);

    const uint64_t task = begin + worker;
    // We might still have more workers than tasks, so check first.
    if (HWY_LIKELY(task < end)) {
      const void* opaque = Opaque();
      const RunFunc func = Func();
      func(opaque, task, worker);
    }
  }

  // Must be called for each `worker` in [0, num_workers).
  //
  // A prior version of this code attempted to assign only as much work as a
  // worker will actually use. As with OpenMP's 'guided' strategy, we assigned
  // remaining/(k*num_threads) in each iteration. Although the worst-case
  // imbalance is bounded, this required several rounds of work allocation, and
  // the atomic counter did not scale to > 30 threads.
  //
  // We now use work stealing instead, where already-finished workers look for
  // and perform work from others, as if they were that worker. This deals with
  // imbalances as they arise, but care is required to reduce contention. We
  // randomize the order in which threads choose victims to steal from.
  HWY_POOL_PROFILE void WorkerRunWithStealing(Worker* worker) const {
    Worker* workers = worker->AllWorkers();
    const size_t index = worker->Index();
    const RunFunc func = Func();
    const void* opaque = Opaque();

    // For each worker in random order, starting with our own, attempt to do
    // all their work.
    for (uint32_t victim : worker->Victims()) {
      Worker* other_worker = workers + victim;

      // Until all of other_worker's work is done:
      const uint64_t other_end = other_worker->MyEnd();
      for (;;) {
        // The worker that first sets `task` to `other_end` exits this loop.
        // After that, `task` can be incremented up to `num_workers - 1` times,
        // once per other worker.
        const uint64_t task = other_worker->WorkerReserveTask();
        if (HWY_UNLIKELY(task >= other_end)) {
          hwy::Pause();  // Reduce coherency traffic while stealing.
          break;
        }
        // Pass the index we are actually running on; this is important
        // because it is the TLS index for user code.
        func(opaque, task, index);
      }
    }
  }

  size_t NumTasks() const {
    return static_cast<size_t>(end_.load(kAcq) - begin_.load(kAcq));
  }

  const void* Opaque() const { return opaque_.load(kAcq); }
  RunFunc Func() const { return func_.load(kAcq); }

  // Calls closure(task, worker). Signature must match `RunFunc`.
  template <class Closure>
  static void CallClosure(const void* opaque, uint64_t task, size_t worker) {
    (*reinterpret_cast<const Closure*>(opaque))(task, worker);
  }

  std::atomic<uint64_t> begin_;
  std::atomic<uint64_t> end_;
  std::atomic<RunFunc> func_;
  std::atomic<const void*> opaque_;
};
static_assert(sizeof(Tasks) == 16 + 2 * sizeof(void*), "");
#pragma pack(pop)

// ------------------------------ Threads wait, main wakes them

// Considerations:
// - uint32_t storage per `Worker` so we can use `futex.h`.
// - avoid atomic read-modify-write. These are implemented on x86 using a LOCK
//   prefix, which interferes with other cores' cache-coherency transactions
//   and drains our core's store buffer. We use only store-release and
//   load-acquire. Although expressed using `std::atomic`, these are normal
//   loads/stores in the strong x86 memory model.
// - prefer to avoid resetting the state. "Sense-reversing" (flipping a flag)
//   would work, but we we prefer an 'epoch' counter because it is more useful
//   and easier to understand/debug, and as fast.

// Both the main thread and each worker maintain their own counter, which are
// implicitly synchronized by the barrier. To wake, the main thread does a
// store-release, and each worker does a load-acquire. The policy classes differ
// in whether they block or spin (with pause/monitor to reduce power), and
// whether workers check their own counter or a shared one.
//
// All methods are const because they only use storage in `Worker`, and we
// prefer to pass const-references to empty classes to enable type deduction.

// Futex: blocking reduces apparent CPU usage, but has higher wake latency.
struct WaitBlock {
  WaitType Type() const { return WaitType::kBlock; }

  // Wakes all workers by storing the current `epoch`.
  void WakeWorkers(Worker* workers, const uint32_t epoch) const {
    HWY_DASSERT(epoch != 0);
    workers[1].StoreWaiter(epoch);
    WakeAll(workers[1].MutableWaiter());  // futex: expensive syscall
  }

  // Waits until `WakeWorkers(_, epoch)` has been called.
  template <class Spin>
  void UntilWoken(const Worker* worker, const Spin& /*spin*/,
                  const uint32_t epoch) const {
    HWY_DASSERT(worker->Index() != 0);  // main is 0
    const Worker* workers = worker->AllWorkers();
    BlockUntilDifferent(epoch - 1, workers[1].Waiter());
  }
};

// Single u32: single store by the main thread. All worker threads poll this
// one cache line and thus have it in a shared state, which means the store
// will invalidate each of them, leading to more transactions than SpinSeparate.
struct WaitSpin1 {
  WaitType Type() const { return WaitType::kSpin1; }

  void WakeWorkers(Worker* workers, const uint32_t epoch) const {
    workers[1].StoreWaiter(epoch);
  }

  template <class Spin>
  void UntilWoken(const Worker* worker, const Spin& spin,
                  const uint32_t epoch) const {
    HWY_DASSERT(worker->Index() != 0);  // main is 0
    const Worker* workers = worker->AllWorkers();
    (void)spin.UntilEqual(epoch, workers[1].Waiter());
    // TODO: store reps in stats.
  }
};

// Separate u32 per thread: more stores for the main thread, but each worker
// only polls its own cache line, leading to fewer cache-coherency transactions.
struct WaitSpinSeparate {
  WaitType Type() const { return WaitType::kSpinSeparate; }

  void WakeWorkers(Worker* workers, const uint32_t epoch) const {
    for (size_t thread = 0; thread < workers->NumThreads(); ++thread) {
      workers[1 + thread].StoreWaiter(epoch);
    }
  }

  template <class Spin>
  void UntilWoken(const Worker* worker, const Spin& spin,
                  const uint32_t epoch) const {
    HWY_DASSERT(worker->Index() != 0);  // main is 0
    (void)spin.UntilEqual(epoch, worker->Waiter());
    // TODO: store reps in stats.
  }
};

// ------------------------------ Barrier: Main thread waits for workers

// Single atomic counter. TODO: remove if not competitive?
template <size_t kShards>
class BarrierCounter {
  static_assert(kShards == 1 || kShards == 2 || kShards == 4, "");  // pow2

 public:
  BarrierType Type() const {
    return kShards == 1   ? BarrierType::kCounter1
           : kShards == 2 ? BarrierType::kCounter2
                          : BarrierType::kCounter4;
  }

  void Reset(Worker* workers) const {
    for (size_t i = 0; i < kShards; ++i) {
      // Use last worker(s) to avoid contention with other stores to the Worker.
      // Note that there are kMaxThreads + 1 workers, hence i == 0 is the last.
      workers[kMaxThreads - i].StoreBarrier(0);
    }
  }

  template <class Spin>
  void WorkerReached(Worker* worker, const Spin& /*spin*/,
                     uint32_t /*epoch*/) const {
    Worker* workers = worker->AllWorkers();
    const size_t shard = worker->Index() & (kShards - 1);
    const auto kAcqRel = std::memory_order_acq_rel;
    workers[kMaxThreads - shard].MutableBarrier().fetch_add(1, kAcqRel);
  }

  template <class Spin>
  void UntilReached(size_t num_threads, const Worker* workers, const Spin& spin,
                    uint32_t /*epoch*/) const {
    HWY_IF_CONSTEXPR(kShards == 1) {
      (void)spin.UntilEqual(static_cast<uint32_t>(num_threads),
                            workers[kMaxThreads - 0].Barrier());
    }
    HWY_IF_CONSTEXPR(kShards == 2) {
      const auto kAcq = std::memory_order_acquire;
      for (;;) {
        hwy::Pause();
        const uint64_t sum = workers[kMaxThreads - 0].Barrier().load(kAcq) +
                             workers[kMaxThreads - 1].Barrier().load(kAcq);
        if (sum == num_threads) break;
      }
    }
    HWY_IF_CONSTEXPR(kShards == 4) {
      const auto kAcq = std::memory_order_acquire;
      for (;;) {
        hwy::Pause();
        const uint64_t sum = workers[kMaxThreads - 0].Barrier().load(kAcq) +
                             workers[kMaxThreads - 1].Barrier().load(kAcq) +
                             workers[kMaxThreads - 2].Barrier().load(kAcq) +
                             workers[kMaxThreads - 3].Barrier().load(kAcq);
        if (sum == num_threads) break;
      }
    }
  }
};

// As with the wait, a store-release of the same local epoch counter serves as a
// "have arrived" flag that does not require resetting.

// Main thread loops over each worker.
class BarrierOrdered {
 public:
  BarrierType Type() const { return BarrierType::kOrdered; }

  void Reset(Worker* /*workers*/) const {}

  template <class Spin>
  void WorkerReached(Worker* worker, const Spin&, uint32_t epoch) const {
    HWY_DASSERT(worker->Index() != 0);  // main is 0
    worker->StoreBarrier(epoch);
  }

  template <class Spin>
  void UntilReached(size_t num_threads, const Worker* workers, const Spin& spin,
                    uint32_t epoch) const {
    for (size_t i = 0; i < num_threads; ++i) {
      (void)spin.UntilEqual(epoch, workers[1 + i].Barrier());
    }
  }
};

// Leader threads wait for others in the group, main thread loops over leaders.
template <size_t kGroupSize>
class BarrierGroup {
 public:
  BarrierType Type() const {
    return kGroupSize == 2 ? BarrierType::kGroup2 : BarrierType::kGroup4;
  }

  void Reset(Worker* /*workers*/) const {}

  template <class Spin>
  void WorkerReached(Worker* worker, const Spin& spin, uint32_t epoch) const {
    const size_t w_idx = worker->Index();
    HWY_DASSERT(w_idx != 0);  // main is 0
    // NOTE: the first worker is 1, but our leader election scheme requires a
    // 0-based index.
    const size_t rel_idx = w_idx - 1;

    Worker* workers = worker->AllWorkers();
    const size_t num_workers = 1 + workers->NumThreads();

    // Leaders (the first worker of each group) wait for all others in their
    // group before marking themselves.
    if (rel_idx % kGroupSize == 0) {
      for (size_t i = w_idx + 1; i < HWY_MIN(w_idx + kGroupSize, num_workers);
           ++i) {
        // No + 1 here: i is derived from w_idx which is the actual index.
        (void)spin.UntilEqual(epoch, workers[i].Barrier());
      }
    }
    worker->StoreBarrier(epoch);
  }

  template <class Spin>
  void UntilReached(size_t num_threads, const Worker* workers, const Spin& spin,
                    uint32_t epoch) const {
    for (size_t i = 0; i < num_threads; i += kGroupSize) {
      (void)spin.UntilEqual(epoch, workers[1 + i].Barrier());
    }
  }
};

// ------------------------------ Inlining policy classes

// We want to inline the various spin/wait/barrier policy classes into larger
// code sections because both the main and worker threads use two or three of
// them at a time, and we do not want separate branches around each.
//
// We generate code for three combinations of the enums, hence implement
// composable adapters that 'add' `Wait` and `Barrier` arguments. `spin.h`
// provides a `CallWithSpin`, hence it is the outermost. C++11 lacks generic
// lambdas, so we implement these as classes.
template <class Func>
class FunctorAddWait {
 public:
  FunctorAddWait(WaitType wait_type, Func&& func)
      : func_(std::forward<Func>(func)), wait_type_(wait_type) {}

  template <class Spin>
  HWY_INLINE void operator()(const Spin& spin) {
    switch (wait_type_) {
      case WaitType::kBlock:
        return func_(spin, WaitBlock());
      case WaitType::kSpin1:
        return func_(spin, WaitSpin1());
      case WaitType::kSpinSeparate:
        return func_(spin, WaitSpinSeparate());
      default:
        HWY_UNREACHABLE;
    }
  }

 private:
  Func&& func_;
  WaitType wait_type_;
};

template <class Func>
class FunctorAddBarrier {
 public:
  FunctorAddBarrier(BarrierType barrier_type, Func&& func)
      : func_(std::forward<Func>(func)), barrier_type_(barrier_type) {}

  template <class Wait>
  HWY_INLINE void operator()(const Wait& wait) {
    switch (barrier_type_) {
      case BarrierType::kOrdered:
        return func_(wait, BarrierOrdered());
      case BarrierType::kCounter1:
        return func_(wait, BarrierCounter<1>());
      case BarrierType::kCounter2:
        return func_(wait, BarrierCounter<2>());
      case BarrierType::kCounter4:
        return func_(wait, BarrierCounter<4>());
      case BarrierType::kGroup2:
        return func_(wait, BarrierGroup<2>());
      case BarrierType::kGroup4:
        return func_(wait, BarrierGroup<4>());
      default:
        HWY_UNREACHABLE;
    }
  }
  template <class Spin, class Wait>
  HWY_INLINE void operator()(const Spin& spin, const Wait& wait) {
    switch (barrier_type_) {
      case BarrierType::kOrdered:
        return func_(spin, wait, BarrierOrdered());
      case BarrierType::kCounter1:
        return func_(spin, wait, BarrierCounter<1>());
      case BarrierType::kCounter2:
        return func_(spin, wait, BarrierCounter<2>());
      case BarrierType::kCounter4:
        return func_(spin, wait, BarrierCounter<4>());
      case BarrierType::kGroup2:
        return func_(spin, wait, BarrierGroup<2>());
      case BarrierType::kGroup4:
        return func_(spin, wait, BarrierGroup<4>());
      default:
        HWY_UNREACHABLE;
    }
  }

 private:
  Func&& func_;
  BarrierType barrier_type_;
};

// Calls unrolled code selected by all 3 enums.
template <class Func>
HWY_INLINE void CallWithConfig(const Config& config, Func&& func) {
  CallWithSpin(
      config.spin_type,
      FunctorAddWait<FunctorAddBarrier<Func>>(
          config.wait_type, FunctorAddBarrier<Func>(config.barrier_type,
                                                    std::forward<Func>(func))));
}

// For `WorkerAdapter`, `Spin` and `Wait`.
template <class Func>
HWY_INLINE void CallWithSpinWait(const Config& config, Func&& func) {
  CallWithSpin(
      config.spin_type,
      FunctorAddWait<Func>(config.wait_type, std::forward<Func>(func)));
}

// For `WorkerAdapter`, only `Spin` and `Barrier`.
template <class Func>
HWY_INLINE void CallWithSpinBarrier(const Config& config, Func&& func) {
  CallWithSpin(
      config.spin_type,
      FunctorAddBarrier<Func>(config.barrier_type, std::forward<Func>(func)));
}

// ------------------------------ Adapters

// Logic of the main and worker threads, again packaged as classes because
// C++11 lacks generic lambdas, called by `CallWith*`.

class MainAdapter {
 public:
  MainAdapter(Worker* main, const Tasks* tasks) : main_(main), tasks_(tasks) {
    HWY_DASSERT(main_ == main->AllWorkers());  // main is first.
  }

  void SetEpoch(uint32_t epoch) { epoch_ = epoch; }

  template <class Spin, class Wait, class Barrier>
  HWY_POOL_PROFILE void operator()(const Spin& spin, const Wait& wait,
                                   const Barrier& barrier) const {
    Worker* workers = main_->AllWorkers();
    const size_t num_threads = main_->NumThreads();
    barrier.Reset(workers);

    wait.WakeWorkers(workers, epoch_);
    // Threads might still be starting up and wake up late, but we wait for
    // them at the barrier below.

    // Also perform work on the main thread before the barrier.
    tasks_->WorkerRun(main_);

    // Waits until all *threads* (not the main thread, because it already knows
    // it is here) called `WorkerReached`. All `barrier` types use spinning.

    barrier.UntilReached(num_threads, workers, spin, epoch_);

    // Threads may already be waiting `UntilWoken`, which serves as the
    // 'release' phase of the barrier.
  }

 private:
  Worker* const main_;
  const Tasks* const tasks_;
  uint32_t epoch_;
};

class WorkerAdapter {
 public:
  explicit WorkerAdapter(Worker* worker) : worker_(worker) {}

  void SetEpoch(uint32_t epoch) { epoch_ = epoch; }

 private:
  template <class Spin, class Wait>
  HWY_INLINE void CallImpl(hwy::SizeTag<1> /* second_param_type_tag */,
                           const Spin& spin, const Wait& wait) const {
    wait.UntilWoken(worker_, spin, epoch_);
  }
  template <class Spin, class Barrier>
  HWY_INLINE void CallImpl(hwy::SizeTag<2> /* second_param_type_tag */,
                           const Spin& spin, const Barrier& barrier) const {
    barrier.WorkerReached(worker_, spin, epoch_);
  }

 public:
  // Split into separate wait/barrier functions because `ThreadFunc` latches
  // the config in between them.
  template <class Spin, class Param2>
  hwy::EnableIf<hwy::IsSameEither<
      hwy::RemoveCvRef<decltype(hwy::RemoveCvRef<Param2>().Type())>, WaitType,
      BarrierType>()>
  operator()(const Spin& spin, const Param2& wait_or_barrier) const {
    // Use tag dispatch to work around template argument deduction error with
    // MSVC 2019.

    constexpr size_t kType =
        hwy::IsSame<
            hwy::RemoveCvRef<decltype(hwy::RemoveCvRef<Param2>().Type())>,
            WaitType>() ? 1 : 2;

    // Using this->CallImpl below ensures that WorkerAdapter::CallImpl is
    // selected and avoids unwanted argument dependent lookup.
    this->CallImpl(hwy::SizeTag<kType>(), spin, wait_or_barrier);
  }

 private:
  Worker* const worker_;
  uint32_t epoch_;
};

// Could also be a lambda in ThreadPool ctor, but this allows annotating with
// `HWY_POOL_PROFILE` so we can more easily inspect the generated code.
class ThreadFunc {
 public:
  ThreadFunc(Worker* worker, Tasks* tasks, Config config)
      : worker_(worker),
        tasks_(tasks),
        config_(config),
        worker_adapter_(worker_) {
    worker->LatchConfig(config);
  }

  HWY_POOL_PROFILE void operator()() {
    // Ensure main thread's writes are visible (synchronizes with fence in
    // `WorkerLifecycle::Init`).
    std::atomic_thread_fence(std::memory_order_acquire);

    HWY_DASSERT(worker_->Index() != 0);  // main is 0
    SetThreadName("worker%03zu", static_cast<int>(worker_->Index() - 1));
    hwy::Profiler::InitThread();

    // Initialization must match pre-increment in `MainAdapter::SetEpoch`.
    // Loop termination is triggered by `~ThreadPool`.
    for (uint32_t epoch = 1;; ++epoch) {
      worker_adapter_.SetEpoch(epoch);
      CallWithSpinWait(config_, worker_adapter_);

      // Must happen before `WorkerRun` because `SendConfig` writes it there.
      config_ = worker_->LatchedConfig();

      tasks_->WorkerRun(worker_);

      // Notify barrier after `WorkerRun`.
      CallWithSpinBarrier(config_, worker_adapter_);

      // Check after notifying the barrier, otherwise the main thread deadlocks.
      if (HWY_UNLIKELY(config_.exit)) break;
    }
  }

 private:
  Worker* const worker_;
  Tasks* const tasks_;

  Config config_;
  WorkerAdapter worker_adapter_;
};

}  // namespace pool

// Highly efficient parallel-for, intended for workloads with thousands of
// fork-join regions which consist of calling tasks[t](i) for a few hundred i,
// using dozens of threads.
//
// To reduce scheduling overhead, we assume that tasks are statically known and
// that threads do not schedule new work themselves. This allows us to avoid
// queues and only store a counter plus the current task. The latter is a
// pointer to a lambda function, without the allocation/indirection required for
// std::function.
//
// To reduce fork/join latency, we choose an efficient barrier, optionally
// enable spin-waits via SetWaitMode, and avoid any mutex/lock. We largely even
// avoid atomic RMW operations (LOCK prefix): currently for the wait and
// barrier, in future hopefully also for work stealing.
//
// To eliminate false sharing and enable reasoning about cache line traffic, the
// class is aligned and holds all worker state.
//
// For load-balancing, we use work stealing in random order.
class alignas(HWY_ALIGNMENT) ThreadPool {
 public:
  // This typically includes hyperthreads, hence it is a loose upper bound.
  // -1 because these are in addition to the main thread.
  static size_t MaxThreads() {
    LogicalProcessorSet lps;
    // This is OS dependent, but more accurate if available because it takes
    // into account restrictions set by cgroups or numactl/taskset.
    if (GetThreadAffinity(lps)) {
      return lps.Count() - 1;
    }
    return static_cast<size_t>(std::thread::hardware_concurrency() - 1);
  }

  // `num_threads` is the number of *additional* threads to spawn, which should
  // not exceed `MaxThreads()`. Note that the main thread also performs work.
  explicit ThreadPool(size_t num_threads)
      : have_timer_stop_(platform::HaveTimerStop(cpu100_)),
        num_threads_(ClampedNumThreads(num_threads)),
        div_workers_(1 + num_threads_),
        workers_(pool::WorkerLifecycle::Init(worker_bytes_, num_threads_,
                                             div_workers_)),
        // Assign main thread the first worker slot (it used to be the last).
        main_adapter_(workers_ + 0, &tasks_) {
    // Leaves the default wait mode as `kBlock`, which means futex, because
    // spinning only makes sense when threads are pinned and wake latency is
    // important, so it must explicitly be requested by calling `SetWaitMode`.
    for (PoolWaitMode mode : {PoolWaitMode::kSpin, PoolWaitMode::kBlock}) {
      wait_mode_ = mode;  // for AutoTuner
      AutoTuner().SetCandidates(
          pool::Config::AllCandidates(mode, num_threads_));
    }
    config_ = AutoTuner().Candidates()[0];

    threads_.reserve(num_threads_);
    for (size_t thread = 0; thread < num_threads_; ++thread) {
      threads_.emplace_back(
          pool::ThreadFunc(workers_ + 1 + thread, &tasks_, config_));
    }

    // No barrier is required here because wakeup works regardless of the
    // relative order of wake and wait.
  }

  // Waits for all threads to exit.
  ~ThreadPool() {
    // There is no portable way to request threads to exit like `ExitThread` on
    // Windows, otherwise we could call that from `Run`. Instead, we must cause
    // the thread to wake up and exit. We can use the same `SendConfig`
    // mechanism as `SetWaitMode`.
    pool::Config copy = config_;
    copy.exit = true;
    SendConfig(copy);

    for (std::thread& thread : threads_) {
      HWY_DASSERT(thread.joinable());
      thread.join();
    }

    pool::WorkerLifecycle::Destroy(workers_, num_threads_);
  }

  ThreadPool(const ThreadPool&) = delete;
  ThreadPool& operator&(const ThreadPool&) = delete;

  // Returns number of Worker, i.e., one more than the largest `worker`
  // argument. Useful for callers that want to allocate thread-local storage.
  size_t NumWorkers() const {
    return static_cast<size_t>(div_workers_.GetDivisor());
  }

  // `mode` defaults to `kBlock`, which means futex. Switching to `kSpin`
  // reduces fork-join overhead especially when there are many calls to `Run`,
  // but wastes power when waiting over long intervals. Must not be called
  // concurrently with any `Run`, because this uses the same waiter/barrier.
  void SetWaitMode(PoolWaitMode mode) {
    wait_mode_ = mode;
    SendConfig(AutoTuneComplete() ? *AutoTuner().Best()
                                  : AutoTuner().NextConfig());
  }

  // For printing which are in use.
  pool::Config config() const { return config_; }

  bool AutoTuneComplete() const { return AutoTuner().Best(); }
  Span<CostDistribution> AutoTuneCosts() { return AutoTuner().Costs(); }

  // parallel-for: Runs `closure(task, worker)` on workers for every `task` in
  // `[begin, end)`. Note that the unit of work should be large enough to
  // amortize the function call overhead, but small enough that each worker
  // processes a few tasks. Thus each `task` is usually a loop.
  //
  // Not thread-safe - concurrent parallel-for in the same `ThreadPool` are
  // forbidden unless `NumWorkers() == 1` or `end <= begin + 1`.
  template <class Closure>
  void Run(uint64_t begin, uint64_t end, const Closure& closure) {
    const size_t num_tasks = static_cast<size_t>(end - begin);
    const size_t num_workers = NumWorkers();

    // If zero or one task, or no extra threads, run on the main thread without
    // setting any member variables, because we may be re-entering Run.
    if (HWY_UNLIKELY(num_tasks <= 1 || num_workers == 1)) {
      for (uint64_t task = begin; task < end; ++task) {
        closure(task, /*worker=*/0);
      }
      return;
    }

    SetBusy();
    const bool is_root = PROFILER_IS_ROOT_RUN();

    tasks_.Set(begin, end, closure);

    // More than one task per worker: use work stealing.
    if (HWY_LIKELY(num_tasks > num_workers)) {
      pool::Tasks::DivideRangeAmongWorkers(begin, end, div_workers_, workers_);
    }

    main_adapter_.SetEpoch(++epoch_);

    AutoTuneT& auto_tuner = AutoTuner();
    if (HWY_LIKELY(auto_tuner.Best())) {
      CallWithConfig(config_, main_adapter_);
      if (is_root) {
        PROFILER_END_ROOT_RUN();
      }
      ClearBusy();
    } else {
      const uint64_t t0 = timer::Start();
      CallWithConfig(config_, main_adapter_);
      const uint64_t t1 = have_timer_stop_ ? timer::Stop() : timer::Start();
      auto_tuner.NotifyCost(t1 - t0);
      if (is_root) {
        PROFILER_END_ROOT_RUN();
      }
      ClearBusy();              // before `SendConfig`
      if (auto_tuner.Best()) {  // just finished
        HWY_IF_CONSTEXPR(pool::kVerbosity >= 1) {
          const size_t idx_best = static_cast<size_t>(
              auto_tuner.Best() - auto_tuner.Candidates().data());
          HWY_DASSERT(idx_best < auto_tuner.Costs().size());
          auto& AT = auto_tuner.Costs()[idx_best];
          const double best_cost = AT.EstimateCost();
          HWY_DASSERT(best_cost > 0.0);  // will divide by this below

          Stats s_ratio;
          for (size_t i = 0; i < auto_tuner.Costs().size(); ++i) {
            if (i == idx_best) continue;
            const double cost = auto_tuner.Costs()[i].EstimateCost();
            s_ratio.Notify(static_cast<float>(cost / best_cost));
          }

          fprintf(stderr, "  %s %5.0f +/- %4.0f. Gain %.2fx [%.2fx, %.2fx]\n",
                  auto_tuner.Best()->ToString().c_str(), best_cost, AT.Stddev(),
                  s_ratio.GeometricMean(), s_ratio.Min(), s_ratio.Max());
        }
        SendConfig(*auto_tuner.Best());
      } else {
        HWY_IF_CONSTEXPR(pool::kVerbosity >= 2) {
          fprintf(stderr, "  %s %5lu\n", config_.ToString().c_str(), t1 - t0);
        }
        SendConfig(auto_tuner.NextConfig());
      }
    }
  }

 private:
  // Used to initialize ThreadPool::num_threads_ from its ctor argument.
  static size_t ClampedNumThreads(size_t num_threads) {
    // Upper bound is required for `worker_bytes_`.
    if (HWY_UNLIKELY(num_threads > pool::kMaxThreads)) {
      HWY_WARN("ThreadPool: clamping num_threads %zu to %zu.", num_threads,
               pool::kMaxThreads);
      num_threads = pool::kMaxThreads;
    }
    return num_threads;
  }

  // Debug-only re-entrancy detection.
  void SetBusy() { HWY_DASSERT(!busy_.test_and_set()); }
  void ClearBusy() { HWY_IF_CONSTEXPR(HWY_IS_DEBUG_BUILD) busy_.clear(); }

  // Two-phase barrier protocol for sending `copy` to workers, similar to the
  // 'quiescent state' used in RCU.
  //
  // Phase 1:
  // - Main wakes threads using the old config.
  // - Threads latch `copy` during `WorkerRun`.
  // - Threads notify a barrier and wait for the next wake using the old config.
  //
  // Phase 2:
  // - Main wakes threads still using the old config.
  // - Threads switch their config to their latched `copy`.
  // - Threads notify a barrier and wait, BOTH with the new config.
  // - Main thread switches to `copy` for the next wake.
  HWY_NOINLINE void SendConfig(pool::Config copy) {
    if (NumWorkers() == 1) {
      config_ = copy;
      return;
    }

    SetBusy();

    const auto closure = [this, copy](uint64_t task, size_t worker) {
      (void)task;
      HWY_DASSERT(task == worker);  // one task per worker
      workers_[worker].LatchConfig(copy);
    };

    tasks_.Set(0, NumWorkers(), closure);
    // Same config as workers are *currently* using.
    main_adapter_.SetEpoch(++epoch_);
    CallWithConfig(config_, main_adapter_);
    // All workers have latched `copy` and are waiting with the old config.

    // No-op task; will not be called because begin == end.
    tasks_.Set(0, 0, [](uint64_t /*task*/, size_t /*worker*/) {});
    // Threads are waiting using the old config, but will switch after waking,
    // which means we must already use the new barrier.
    pool::Config new_barrier = config_;
    new_barrier.barrier_type = copy.barrier_type;
    main_adapter_.SetEpoch(++epoch_);
    CallWithConfig(new_barrier, main_adapter_);
    // All have woken and are, or will be, waiting per the *new* config. Now we
    // can entirely switch the main thread's config for the next wake.
    config_ = copy;

    ClearBusy();
  }

  using AutoTuneT = AutoTune<pool::Config, 30>;
  AutoTuneT& AutoTuner() {
    static_assert(static_cast<size_t>(PoolWaitMode::kBlock) == 1, "");
    return auto_tune_[static_cast<size_t>(wait_mode_) - 1];
  }
  const AutoTuneT& AutoTuner() const {
    return auto_tune_[static_cast<size_t>(wait_mode_) - 1];
  }

  char cpu100_[100];
  const bool have_timer_stop_;
  const size_t num_threads_;  // not including main thread
  const Divisor64 div_workers_;
  pool::Worker* const workers_;  // points into `worker_bytes_`

  pool::MainAdapter main_adapter_;

  // The only mutable state:
  pool::Tasks tasks_;    // written by `Run` and read by workers.
  pool::Config config_;  // for use by the next `Run`. Updated via `SendConfig`.
  uint32_t epoch_ = 0;   // passed to `MainAdapter`.

  // In debug builds, detects if functions are re-entered.
  std::atomic_flag busy_ = ATOMIC_FLAG_INIT;

  // Unmodified after ctor, but cannot be const because we call thread::join().
  std::vector<std::thread> threads_;

  PoolWaitMode wait_mode_;
  AutoTuneT auto_tune_[2];  // accessed via `AutoTuner`

  // Last because it is large. Store inside `ThreadPool` so that callers can
  // bind it to the NUMA node's memory. Not stored inside `WorkerLifecycle`
  // because that class would be initialized after `workers_`.
  alignas(HWY_ALIGNMENT) uint8_t
      worker_bytes_[sizeof(pool::Worker) * (pool::kMaxThreads + 1)];
};

}  // namespace hwy

#endif  // HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_
