feat(core): dispatchers, scopes, and exceptions

HytaleDispatchers (World/HytaleIO/HytaleScheduled), per-key scope
registries (PlayerScopes, WorldScopes, PluginScopes) keyed by UUID
or plugin identity, and the sealed AsyncException hierarchy.
Includes unit tests against in-memory stubs.
This commit is contained in:
2026-04-28 16:29:55 +02:00
parent e201a227ea
commit a10294c01f
11 changed files with 541 additions and 0 deletions
+52
View File
@@ -0,0 +1,52 @@
# `:core`
Dispatchers and coroutine scopes. No knowledge of the Hytale SDK — the only
runtime deps are `kotlinx-coroutines` and SLF4J.
## Dispatchers
```kotlin
AsyncDispatchers.World(world) // confines to a specific world's main thread
AsyncDispatchers.HytaleIO // bounded pool for blocking I/O
AsyncDispatchers.HytaleScheduled // backs delay() and withTimeout()
AsyncDispatchers.configureIo(parallelism = 16)
AsyncDispatchers.configureScheduled(myExecutor)
```
`World` takes a `WorldExecutor` interface, not the Hytale `World` class
directly. The `:binding` module wires the real `World` in via
`World.asExecutor()`. Tests substitute single-thread executors keyed by UUID.
## Scopes
Three registries, all `SupervisorJob`-backed (a child failure doesn't kill
siblings) and all defaulting to `HytaleIO`:
```kotlin
PlayerScopes.of(uuid) // get-or-create
PlayerScopes.cancel(uuid) // idempotent
PlayerScopes.cancelAll()
PlayerScopes.activeCount()
WorldScopes.of(worldUuid) // same shape
PluginScopes.of(plugin) // identity-keyed (IdentityHashMap), not equality
```
`Async.shutdown()` cancels all three registries and is idempotent — call it
from your plugin's `shutdown()`.
## Exceptions
A sealed hierarchy under `AsyncException` so consumers can catch one type if
they want a uniform error path:
- `WorldClosedException` — dispatching to a dead world
- `NoWorldInContextException``MainHere` invoked off-context
- `ComponentNotFoundException` — strict `read`/`modify` on a missing component
- `ComponentTypeNotRegisteredException` — DSL used on an unregistered class
## Tests
`SmokeTest` and `ScopesTest` run against stub `WorldExecutor` implementations
backed by `Executors.newSingleThreadExecutor`. No Hytale jar required.
+19
View File
@@ -0,0 +1,19 @@
plugins {
alias(libs.plugins.kotlin.jvm)
`java-library`
}
dependencies {
api(libs.kotlinx.coroutines.core)
api(libs.kotlinx.coroutines.jdk8)
api(libs.slf4j.api)
compileOnly(libs.hytale.server)
testImplementation(libs.kotlinx.coroutines.test)
testImplementation(libs.junit.jupiter)
testRuntimeOnly(libs.junit.platform.launcher)
testImplementation(libs.kotest.assertions)
testImplementation(libs.mockk)
testRuntimeOnly(libs.slf4j.simple)
}
@@ -0,0 +1,24 @@
package com.mythlane.async
import com.mythlane.async.scope.PlayerScopes
import com.mythlane.async.scope.PluginScopes
import com.mythlane.async.scope.WorldScopes
/**
* Library-wide entry point. Currently exposes a single shutdown helper that
* cancels every coroutine scope tracked by Async — call from your
* plugin's `shutdown()`.
*
* Disconnect / world-unload wireup lives in the `:hytale` binding module
* (`installAsync()`), not here, so `:core` stays free of Hytale imports.
*
* @ThreadSafe
*/
object Async {
/** Cancels all player, world, and plugin scopes. Idempotent. */
fun shutdown() {
PlayerScopes.cancelAll()
WorldScopes.cancelAll()
PluginScopes.cancelAll()
}
}
@@ -0,0 +1,92 @@
package com.mythlane.async.dispatchers
import com.mythlane.async.exception.WorldClosedException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.CoroutineContext
/**
* Coroutine dispatchers tailored for the Hytale threading model.
*
* - [World]: confines work to a specific world's main thread.
* - [HytaleIO]: bounded pool for blocking I/O (DB, HTTP, file).
* - [HytaleScheduled]: backs `delay()` and timed tasks.
*
* @ThreadSafe All members may be accessed from any thread.
*/
@OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
object AsyncDispatchers {
private val worldDispatchers = ConcurrentHashMap<java.util.UUID, CoroutineDispatcher>()
/** Default IO parallelism; override via [configureIo]. */
private const val DEFAULT_IO_PARALLELISM_MULTIPLIER = 2
@Volatile
private var ioDispatcher: CoroutineDispatcher = Dispatchers.IO.limitedParallelism(
Runtime.getRuntime().availableProcessors() * DEFAULT_IO_PARALLELISM_MULTIPLIER
)
@Volatile
private var scheduled: ScheduledExecutorService = Executors.newScheduledThreadPool(2) { r ->
Thread(r, "Async-Scheduled-${schedulerCounter.incrementAndGet()}").apply { isDaemon = true }
}
private val schedulerCounter = AtomicInteger(0)
/**
* Returns a dispatcher that confines coroutines to [world]'s main thread.
*
* Each dispatched task is wrapped to honor coroutine cancellation: if the
* job is cancelled before the world thread picks the task up, it is skipped.
*
* @throws WorldClosedException if [world] is no longer alive at dispatch time.
*/
fun World(world: WorldExecutor): CoroutineDispatcher =
worldDispatchers.computeIfAbsent(world.worldId) { WorldCoroutineDispatcher(world) }
/** Bounded IO dispatcher for blocking work. */
val HytaleIO: CoroutineDispatcher get() = ioDispatcher
/** Scheduled dispatcher; backs `delay()` and `withTimeout`. */
val HytaleScheduled: CoroutineDispatcher get() = scheduled.asCoroutineDispatcher()
/**
* Override the IO pool. Call once during plugin init if defaults aren't right.
*/
fun configureIo(parallelism: Int) {
require(parallelism > 0) { "parallelism must be > 0" }
ioDispatcher = Dispatchers.IO.limitedParallelism(parallelism)
}
/**
* Override the scheduled pool. Call once during plugin init.
* Production wiring should pass `HytaleServer.SCHEDULED_EXECUTOR`.
*/
fun configureScheduled(executor: ScheduledExecutorService) {
scheduled = executor
}
/** Test hook: drop the dispatcher cached for [worldId]. */
internal fun evictWorld(worldId: java.util.UUID) {
worldDispatchers.remove(worldId)
}
private class WorldCoroutineDispatcher(private val world: WorldExecutor) : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!world.isAlive()) throw WorldClosedException(world.worldId.toString())
world.execute(Runnable {
// Skip if the coroutine was cancelled before the world thread woke up.
val job = context[kotlinx.coroutines.Job]
if (job != null && !job.isActive) return@Runnable
block.run()
})
}
override fun toString(): String = "Dispatchers.World(${world.worldId})"
}
}
@@ -0,0 +1,20 @@
package com.mythlane.async.dispatchers
import java.util.UUID
/**
* Minimal abstraction over the bits of `com.hypixel.hytale.api.world.World` that
* `Dispatchers.World` actually needs. Tests substitute a single-thread executor;
* production wires this to `world.execute(Runnable)` and `world.isAlive()`.
*
* Keeping this as a SAM-friendly interface (not directly the Hytale World class)
* means `:core` does NOT have a hard `compileOnly` dependency on the Hytale jar
* for its core primitives — easier testing, cleaner module boundary.
*
* @see com.mythlane.async.dispatchers.AsyncDispatchers.World
*/
interface WorldExecutor {
val worldId: UUID
fun isAlive(): Boolean
fun execute(task: Runnable)
}
@@ -0,0 +1,26 @@
package com.mythlane.async.exception
/**
* Sealed root of every exception thrown by the Async library.
* Consumers can catch this to handle any library-specific failure uniformly.
*/
sealed class AsyncException(message: String, cause: Throwable? = null) : RuntimeException(message, cause)
/** Thrown when dispatching to a world that is no longer alive. */
class WorldClosedException(worldId: String) :
AsyncException("World $worldId is closed; cannot dispatch")
/** Thrown by `Dispatchers.MainHere` when the current coroutine context has no associated world. */
class NoWorldInContextException :
AsyncException("No world found in current coroutine context")
/** Thrown when a strict component access cannot find the requested component on an entity ref. */
class ComponentNotFoundException(typeName: String) :
AsyncException("Component $typeName not present on entity ref")
/** Thrown when [com.mythlane.async.ecs.ComponentRegistry] has no mapping for the requested Kotlin class. */
class ComponentTypeNotRegisteredException(typeName: String) :
AsyncException(
"No ComponentType registered for $typeName. " +
"Call ComponentRegistry.register<$typeName>(componentType) during plugin start()."
)
@@ -0,0 +1,49 @@
package com.mythlane.async.scope
import com.mythlane.async.dispatchers.AsyncDispatchers
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import org.slf4j.LoggerFactory
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
/**
* Per-player [CoroutineScope] registry. Scopes are created lazily on first lookup
* and cancelled atomically when [cancel] is invoked (typically from the
* `PlayerDisconnectEvent` listener wired by `Async.install`).
*
* Each scope uses a [SupervisorJob] so a single child failure does not cascade
* to siblings, and defaults to [AsyncDispatchers.HytaleIO].
*
* @ThreadSafe
*/
object PlayerScopes {
private val log = LoggerFactory.getLogger(PlayerScopes::class.java)
private val scopes = ConcurrentHashMap<UUID, CoroutineScope>()
private val handler = CoroutineExceptionHandler { ctx, throwable ->
log.error("Unhandled exception in player scope ${ctx[kotlinx.coroutines.CoroutineName]?.name}", throwable)
}
/** Get-or-create the scope for [playerId]. */
fun of(playerId: UUID): CoroutineScope =
scopes.computeIfAbsent(playerId) {
CoroutineScope(SupervisorJob() + AsyncDispatchers.HytaleIO + handler)
}
/** Cancel and remove the scope for [playerId]; safe no-op if absent. */
fun cancel(playerId: UUID) {
scopes.remove(playerId)?.cancel()
}
/** Cancel everything; intended for plugin shutdown. */
fun cancelAll() {
val snapshot = scopes.values.toList()
scopes.clear()
snapshot.forEach { it.cancel() }
}
internal fun activeCount(): Int = scopes.size
}
@@ -0,0 +1,54 @@
package com.mythlane.async.scope
import com.mythlane.async.dispatchers.AsyncDispatchers
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import org.slf4j.LoggerFactory
import java.util.Collections
import java.util.IdentityHashMap
/**
* Per-plugin [CoroutineScope] registry. Identity-keyed by the plugin instance —
* two plugins with equal `equals` (rare for plugins) still get distinct scopes,
* which matches the actual lifecycle ownership.
*
* Cancelled in the plugin's `shutdown()`, typically via [com.mythlane.async.Async.shutdown].
*
* @ThreadSafe
*/
object PluginScopes {
private val log = LoggerFactory.getLogger(PluginScopes::class.java)
// IdentityHashMap because plugins are reference-identity entities; sync wrapper
// since IdentityHashMap is not concurrent.
private val scopes: MutableMap<Any, CoroutineScope> =
Collections.synchronizedMap(IdentityHashMap())
private val handler = CoroutineExceptionHandler { _, throwable ->
log.error("Unhandled exception in plugin scope", throwable)
}
fun of(plugin: Any): CoroutineScope = synchronized(scopes) {
scopes.getOrPut(plugin) {
CoroutineScope(SupervisorJob() + AsyncDispatchers.HytaleIO + handler)
}
}
fun cancel(plugin: Any) {
val scope = synchronized(scopes) { scopes.remove(plugin) }
scope?.cancel()
}
fun cancelAll() {
val snapshot = synchronized(scopes) {
val copy = scopes.values.toList()
scopes.clear()
copy
}
snapshot.forEach { it.cancel() }
}
internal fun activeCount(): Int = synchronized(scopes) { scopes.size }
}
@@ -0,0 +1,45 @@
package com.mythlane.async.scope
import com.mythlane.async.dispatchers.AsyncDispatchers
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import org.slf4j.LoggerFactory
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
/**
* Per-world [CoroutineScope] registry, keyed by world UUID. Cancelled by the
* `:hytale` binding on world unload (or manually until the unload event ships).
*
* Scopes use [SupervisorJob] so a single child failure does not cascade,
* default to [AsyncDispatchers.HytaleIO], and log unhandled exceptions.
*
* @ThreadSafe
*/
object WorldScopes {
private val log = LoggerFactory.getLogger(WorldScopes::class.java)
private val scopes = ConcurrentHashMap<UUID, CoroutineScope>()
private val handler = CoroutineExceptionHandler { _, throwable ->
log.error("Unhandled exception in world scope", throwable)
}
fun of(worldId: UUID): CoroutineScope =
scopes.computeIfAbsent(worldId) {
CoroutineScope(SupervisorJob() + AsyncDispatchers.HytaleIO + handler)
}
fun cancel(worldId: UUID) {
scopes.remove(worldId)?.cancel()
}
fun cancelAll() {
val snapshot = scopes.values.toList()
scopes.clear()
snapshot.forEach { it.cancel() }
}
internal fun activeCount(): Int = scopes.size
}
@@ -0,0 +1,83 @@
package com.mythlane.async
import com.mythlane.async.scope.PlayerScopes
import com.mythlane.async.scope.PluginScopes
import com.mythlane.async.scope.WorldScopes
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
class ScopesTest {
@AfterEach fun tearDown() = Async.shutdown()
@Test fun `WorldScopes - cancel stops in-flight work and removes scope`() = runTest {
val id = UUID.randomUUID()
val ran = AtomicBoolean(false)
val job = WorldScopes.of(id).launch { delay(10_000); ran.set(true) }
WorldScopes.cancel(id)
job.join()
ran.get() shouldBe false
WorldScopes.activeCount() shouldBe 0
}
@Test fun `WorldScopes - same UUID returns same scope`() {
val id = UUID.randomUUID()
val a = WorldScopes.of(id)
val b = WorldScopes.of(id)
(a === b) shouldBe true
}
@Test fun `WorldScopes - distinct UUIDs are isolated on cancel`() = runTest {
val a = UUID.randomUUID(); val b = UUID.randomUUID()
val ranA = AtomicBoolean(false); val ranB = AtomicBoolean(false)
val jobA = WorldScopes.of(a).launch { delay(10_000); ranA.set(true) }
val jobB = WorldScopes.of(b).launch { delay(50); ranB.set(true) }
WorldScopes.cancel(a)
jobA.join(); jobB.join()
ranA.get() shouldBe false
ranB.get() shouldBe true
}
@Test fun `PluginScopes - identity-keyed and cancel works`() = runTest {
val plugin = Any()
val ran = AtomicBoolean(false)
val job = PluginScopes.of(plugin).launch { delay(10_000); ran.set(true) }
PluginScopes.cancel(plugin)
job.join()
ran.get() shouldBe false
PluginScopes.activeCount() shouldBe 0
}
@Test fun `PluginScopes - two equal-but-distinct plugin instances get separate scopes`() {
// Identity, not equality. Use a value-equal data class instance pair.
data class P(val name: String)
val p1 = P("x"); val p2 = P("x")
(p1 == p2) shouldBe true
val s1 = PluginScopes.of(p1)
val s2 = PluginScopes.of(p2)
(s1 === s2) shouldBe false
}
@Test fun `Async shutdown cancels all registries`() = runTest {
PlayerScopes.of(UUID.randomUUID()).launch { delay(10_000) }
WorldScopes.of(UUID.randomUUID()).launch { delay(10_000) }
PluginScopes.of(Any()).launch { delay(10_000) }
Async.shutdown()
PlayerScopes.activeCount() shouldBe 0
WorldScopes.activeCount() shouldBe 0
PluginScopes.activeCount() shouldBe 0
}
@Test fun `cancelAll is idempotent`() {
WorldScopes.cancelAll()
WorldScopes.cancelAll()
PluginScopes.cancelAll()
PluginScopes.cancelAll()
}
}
@@ -0,0 +1,77 @@
package com.mythlane.async
import com.mythlane.async.dispatchers.AsyncDispatchers
import com.mythlane.async.dispatchers.WorldExecutor
import com.mythlane.async.exception.WorldClosedException
import com.mythlane.async.scope.PlayerScopes
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.matchers.shouldBe
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import java.util.UUID
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
/**
* Smoke test: `Dispatchers.World` + `PlayerScopes` end-to-end against a stub world.
* Proves that work submitted from an arbitrary thread lands on the world's main thread,
* and that disconnecting a player cancels in-flight work.
*/
class SmokeTest {
private class StubWorld(override val worldId: UUID = UUID.randomUUID()) : WorldExecutor {
private val exec = Executors.newSingleThreadExecutor { r ->
Thread(r, "Stub-World-$worldId").also { mainThread.set(it) }
}
val mainThread = AtomicReference<Thread>()
private val alive = AtomicBoolean(true)
override fun isAlive() = alive.get()
override fun execute(task: Runnable) { exec.submit(task) }
fun shutdown() { alive.set(false); exec.shutdownNow(); exec.awaitTermination(2, TimeUnit.SECONDS) }
}
private val world = StubWorld()
@AfterEach fun tearDown() {
AsyncDispatchers.evictWorld(world.worldId)
PlayerScopes.cancelAll()
world.shutdown()
}
@Test fun `World dispatcher confines work to world main thread`() = runTest {
val landed = AtomicReference<Thread>()
withContext(AsyncDispatchers.World(world)) {
landed.set(Thread.currentThread())
}
landed.get() shouldBe world.mainThread.get()
}
@Test fun `dispatching to a dead world throws WorldClosedException`() = runTest {
world.shutdown()
shouldThrow<WorldClosedException> {
withContext(AsyncDispatchers.World(world)) { /* never runs */ }
}
}
@Test fun `cancelling player scope stops in-flight work`() = runTest {
val playerId = UUID.randomUUID()
val scope = PlayerScopes.of(playerId)
val ran = AtomicBoolean(false)
val job = scope.launch {
delay(10_000)
ran.set(true)
}
PlayerScopes.cancel(playerId)
job.join()
ran.get() shouldBe false
PlayerScopes.activeCount() shouldBe 0
}
}