/** Utilities for processing data in parallel on multiple background threads. */ public struct ParallelProcessing { // NOTE: Private initializer, static utilities only. private init() { } // The default level of parallelism used to process items. public static let DEFAULT_MAX_PARALLEL_TASKS = 4 /** Applies a processing task (`processItem`) to each item in parallel. Limits the maximum parallelism of the processing to `maxParallelTasks` to limit the number of threads created. NOTE: The default level of parallelism is `Self.DEFAULT_MAX_PARALLEL_TASKS`. */ public static func processItemsInParallel<Item, Result>( items: [Item], maxParallelTasks: Int = DEFAULT_MAX_PARALLEL_TASKS, processInRandomOrder: Bool = true, processItem: @escaping (Item) async -> Result? ) async -> [Result] { // Initialize the processing queue, shuffling the items into random order if requested. let queue = OrderedParallelProcessingQueue( items: processInRandomOrder ? items.shuffled() : items ) // Process items in a single task group, with up to the max parallel tasks, // combining individual task results into one long list. let results: [Result] = await withTaskGroup(of: [Result].self) { taskGroup in for _ in (0..<maxParallelTasks) { let _ = taskGroup.addTaskUnlessCancelled { // Continue processing until the queue is empty, checking for cancellation along the way. var taskResults: [Result] = [] while let nextItem = await queue.incrementAndGetNextItem() { guard !Task.isCancelled else { break } guard let result = await processItem(nextItem) else { continue } taskResults.append(result) } // Return the results of the task. return taskResults } } // Combine all task result lists into one long list. return await taskGroup.reduce([]) { allResults, taskResults in allResults + taskResults } } // With the process fully complete, return the overall result list. return results } } /** An actor that holds the list of items that require processing via multiple background processes. Allows for a single item to be served up to each background process to work on at a time, without having to split the list into unevenly sized chunks, where some threads will reach their idle states before all processing is completed. */ private actor OrderedParallelProcessingQueue<Item> { /** The list of items to be processed in background. */ private let items: [Item] /** The index of the last item returned by `incrementAndGetNextItem`, nil if the first item has not been returned yet. */ private var currentIndex: Int? /** Initializes the queue with a list of items. */ init(items: [Item]) { self.items = items self.currentIndex = nil } /** Returns the next item in the queue to process, or nil if all items have been processed. */ func incrementAndGetNextItem() -> Item? { // Increment the index to the next item to process. if let currentIndexValue = currentIndex { // We started processing, increment index. currentIndex = currentIndexValue + 1 } else { // First query to the queue, start at the head. currentIndex = 0 } // Ensure that we have a next item, and are not fully done yet. guard let currentIndex = currentIndex, currentIndex < items.count else { return nil } // Return the next item to process if available. return items[currentIndex] } }