First experimental release
Welcome to the first release of gears, the experimental cross-platform asynchronous programming library for Scala 3!
In this first release, we introduce concepts of asynchronous programming, both low-level/unstructured (sources, listeners, channels) and concepts for high-level structured concurrency. These are the first iteration of a complete design, and might have some holes in usability and performance. Feedback is appreciated!
A tutorial about the basic concepts will be available soon.
What is included?
User-facing, high level asynchronous programming
gears.asyncintroduces the concept of anAsynccontext/capability: functions that implicitly take anAsynccontext (using Async) are suspendable computations and are capable of both performing asynchronous operations and spawning more concurrent asynchronous computations. This is the recommended way to write asynchronous code: Functions that explicitly need to suspend or spawn concurrent computations should take anAsynccontext:
However, higher-order functions that do not explicitly use these capabilities do not have to take andef performAsyncIO(using Async): Int = ???Asynccontext.val result: Seq[Int] = (1 to 5).map(_ => performAsyncIO) // ^^^ map is the regular Seq.map implementation!- Spawning concurrent computations: Computations that are run concurrent with the caller can be spawned by invoking
Future.applywith a body:
Different from the previous example, this creates 5 concurrently (possibly parallelly) running computations, with aval resultFuts: Seq[Future[Int]] = (1 to 5).map(_ => Future(performAsyncIO))Futuretype that you canawaitfor.awaiting effectively suspends the current computation until the result of the awaited computation is available.
This awaits for the results in order. Note thatval resultsTry: List[Try[Int]] = resultFuts.map(_.awaitResult).awaitResultrequires anAsynccontext. It returns aTry[Int], since the computation inside the Future may throw or be interrupted. To bypass this and directly get the result (rethrowing on Failure), use.await.val results: List[Int] = resultFuts.map(_.await) - Working with Futures: some library functions are provided to make working with futures more convenient:
Seq[Future[T]].awaitAllwaits for all futures to complete and returnSeq[T]as a result, throwing when the first failure appears.Seq[Future[T]].altAllwaits for the first future to succeed, returning its value. If all fails, return the last failure. Both variants provide aWithCancelalternative for owned futures, where if the wait is short-circuited, then other futures are optimistically cancelled.Async.selectallows you to race the futures, and continue the computation based on which value was received:
val fFut = Future(f()) val gFut = Future(g()) val results = Async.select( fFut.andThen: x => ???, // handle f result ... gFut.andThen: y => ??? // handle g result ... ) - Structured Concurrency: Every
Asynccontext carries a completion group, tracking all concurrently running cancellable computations in a tree-like structure (groups can contain other groups). A group can be manually spawned byAsync.group, which is automatically linked to the group present in the currentAsynccontext:
Upon the return of the main body, all running concurrent computations areval compute = Async.group: val f1 = Future(f()) val g1 = Future(g()) Seq(f1, g1).altAllcancelled (by calling.cancel()and awaited for. This guarantees that once outsideAsync.group, no concurrent computations inside that group can still be running, and we maintain the tree-like structure.Future.applyautomatically creates a new group for its running body, so when you decide to compute something in parallel, you completely control its lifetime, including all of the concurrent computations that it spawned! When the main body of the group completes, all unneeded/unawaited running computations are cancelled and awaited for clean up. - Going in-and-out of
Async:Async.blockingcreates anAsynccontext out of thin air (given a suspension implementation and a scheduler)! ThisAsynccontext blocks the running thread for suspension, which is typically not what you want. However, it is useful in two scenarios:- As the root
Asynccontext: you would put this under themainfunction of the application. - As a truly blocking call to an asynchrous operation: not recommended, but this works similar to Node.js
...Syncvariants, where the thread is blocked until the operation completes, returning the results directly.
- As the root
- Cross Platform:
gearsis implemented generically, only assuming a suspension interface and a scheduler to work (seeAsyncSupport). However, two default implementations are provided:- Using JVM >=21's virtual threads, alongside the usual JVM virtual thread scheduler
- Using Scala Native 0.5 delimited continuations, with the
ForkJoinPoolscheduler Both can be provided by importinggears.async.default.given, which automatically selects the correct implementation for the platform.
Mid-level, unstructured asynchronous operations
- Sources are the abstractions of asynchronous resources that an
Asynccontext can.awaitResultfor. From the high-level interface, we have been usingFuture[T], which is actually aSource[Try[T]]that has a special property: once completed,Future[T]always return the same result. - Listeners are the primary out-of-band way to receive values from a
Source. The most simple implementation of a listener can be done byListener.apply, which takes the itemTand the originSource[T]and do something with it! Listeners always receive at most one item from theSource, and are removed from theSource's list once completed. Listener bodies are run on the same computation that resolves the sources, so usage of them needs to be carefully tuned for a high degree of concurrency performance.- Locking listeners adds synchronization capabilities to a listener, to allow it to listen to multiple sources and decide whether it would be open to accepting the item or it is already expired. To know more, check out the
Listenerinterface.
- Locking listeners adds synchronization capabilities to a listener, to allow it to listen to multiple sources and decide whether it would be open to accepting the item or it is already expired. To know more, check out the
- Channels are bidirectional channels that can be use as a communication mean between concurrent processes. They provide
.send(x: T)(using Async)and.read()(using Async)as suspending methods, but also.readSourceand.sendSource(x: T)sources that reads/sends an item when a listener is attached and accepts the item/event. Three types of channels are provided:SyncChannelsare unbuffered synchronous (rendezvous) channels where sends and reads block until the transfer is actually made.BufferedChannelsallow sends to be buffered up to a certain buffer count, completing instantly if buffer space is available.UnboundedChannelsallow sends to be always buffered with a growable buffer, returning instantly in all cases. It exposes a.sendImmediatelymethod that allows sending without anAsynccontext.
race,selectand channel synchronization:gearsprovide exclusivity when using the aboveAsync.selectmethod with sources: exactly one of the given sources given will be resolved by theselect(and no other item/event will be consumed from other sources). This is especially important for channels, as you typically would want items from a channel being raced and thrown away: has a similar exclusivity semantics to , but expects a list of as parameters and return a , resolving with an item when one of the inputs produces one. This allows you to compose s by racing them losslessly:
Low-level suspension API and scheduling
gears's Async context requires the machinery of a Suspension API, which resembles "delimited continuations" to work. While default implementations for JVM 21+ and Scala Native 0.5+ exists, it is entirely replaceable and custom implementations (for example, to provide compatibility with JVM <= 20 with custom fibers) can be provided to create an Async context through Async.blocking.
A Scheduler is also needed. By default, the default JVM virtual thread scheduler is used on JVM and ForkJoinPool is used on Scala Native.
Some "primitive" operations, such as sleep, are required from the scheduler/suspension implementation through the AsyncOperations interface. Again, default implementations are provided, but this can be customized.
What's coming up next?
- A strawman interface for cross platform IO: A separate library
gears-iois in the early stage of design, which will provide a common asynchronous I/O (file, sockets) interface using Gear'sAsynccontext functions. We hope to enable cross-platform performant asynchronous I/O through this interface. - More refinement to the API and support for structured concurrency
- Your input! We would appreciate any effort to start using the library, build something cool and report the experience!