/** Utilities for processing data in parallel on multiple background threads. */ public struct ParallelProcessing { // NOTE: Private initializer, static utilities only. private init() { } // The default level of parallelism used to process items. public static let DEFAULT_MAX_PARALLEL_TASKS = 4 /** Applies a processing task (`processItem`) to each item in parallel. Limits the maximum parallelism of the processing to `maxParallelTasks` to limit the number of threads created. NOTE: The default level of parallelism is `Self.DEFAULT_MAX_PARALLEL_TASKS`. */ public static func processItemsInParallel( items: [Item], maxParallelTasks: Int = DEFAULT_MAX_PARALLEL_TASKS, processInRandomOrder: Bool = true, processItem: @escaping (Item) async -> Result? ) async -> [Result] { // Initialize the processing queue, shuffling the items into random order if requested. let queue = OrderedParallelProcessingQueue( items: processInRandomOrder ? items.shuffled() : items ) // Process items in a single task group, with up to the max parallel tasks, // combining individual task results into one long list. let results: [Result] = await withTaskGroup(of: [Result].self) { taskGroup in for _ in (0.. { /** The list of items to be processed in background. */ private let items: [Item] /** The index of the last item returned by `incrementAndGetNextItem`, nil if the first item has not been returned yet. */ private var currentIndex: Int? /** Initializes the queue with a list of items. */ init(items: [Item]) { self.items = items self.currentIndex = nil } /** Returns the next item in the queue to process, or nil if all items have been processed. */ func incrementAndGetNextItem() -> Item? { // Increment the index to the next item to process. if let currentIndexValue = currentIndex { // We started processing, increment index. currentIndex = currentIndexValue + 1 } else { // First query to the queue, start at the head. currentIndex = 0 } // Ensure that we have a next item, and are not fully done yet. guard let currentIndex = currentIndex, currentIndex < items.count else { return nil } // Return the next item to process if available. return items[currentIndex] } }