From 7692fb502b8ef9fe1cf9a97582845d099c95bdf5 Mon Sep 17 00:00:00 2001 From: alex Date: Wed, 27 Jul 2022 10:40:24 +0800 Subject: [PATCH] add bus code --- .../gaea/bus/AllowConcurrentSubscribe.kt | 28 ++ .../kotlin/com/synebula/gaea/bus/AsyncBus.kt | 68 +++++ .../main/kotlin/com/synebula/gaea/bus/Bus.kt | 254 ++++++++++++++++ .../com/synebula/gaea/bus/DeadMessage.kt | 36 +++ .../com/synebula/gaea/bus/Dispatcher.kt | 151 ++++++++++ .../main/kotlin/com/synebula/gaea/bus/IBus.kt | 41 +++ .../kotlin/com/synebula/gaea/bus/IMessage.kt | 3 - .../com/synebula/gaea/bus/IMessageBus.kt | 28 -- .../com/synebula/gaea/bus/LoggingHandler.kt | 34 +++ .../kotlin/com/synebula/gaea/bus/Subscribe.kt | 36 +++ .../com/synebula/gaea/bus/Subscriber.kt | 128 ++++++++ .../gaea/bus/SubscriberExceptionContext.kt | 31 ++ .../gaea/bus/SubscriberExceptionHandler.kt | 24 ++ .../synebula/gaea/bus/SubscriberRegistry.kt | 193 ++++++++++++ .../gaea/bus/messagebus/AsyncMessageBus.kt | 71 +++++ .../gaea/bus/messagebus/IMessageBus.kt | 39 +++ .../gaea/bus/messagebus/MessageBus.kt | 280 ++++++++++++++++++ .../bus/messagebus/TopicSubscriberRegistry.kt | 184 ++++++++++++ .../kotlin/com/synebula/gaea/bus/BusTest.kt | 94 ++++++ 19 files changed, 1692 insertions(+), 31 deletions(-) create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/AllowConcurrentSubscribe.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/AsyncBus.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/Bus.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/DeadMessage.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/Dispatcher.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/IBus.kt delete mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/IMessage.kt delete mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/IMessageBus.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/LoggingHandler.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscribe.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscriber.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberExceptionContext.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberExceptionHandler.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberRegistry.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/AsyncMessageBus.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/IMessageBus.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/MessageBus.kt create mode 100644 src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/TopicSubscriberRegistry.kt create mode 100644 src/gaea/src/test/kotlin/com/synebula/gaea/bus/BusTest.kt diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AllowConcurrentSubscribe.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AllowConcurrentSubscribe.kt new file mode 100644 index 0000000..1fe2f13 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AllowConcurrentSubscribe.kt @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +/** + * Marks an message subscriber method as being thread-safe. This annotation indicates that MessageBus + * may invoke the message subscriber simultaneously from multiple threads. + * + * + * This does not mark the method, and so should be used in combination with [Subscribe]. + * + * @author Cliff + * @since 10.0 + */ +@Retention(AnnotationRetention.RUNTIME) +@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER) +annotation class AllowConcurrentSubscribe \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AsyncBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AsyncBus.kt new file mode 100644 index 0000000..1da1f56 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AsyncBus.kt @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +import java.util.concurrent.Executor + +/** + * An [Bus] that takes the Executor of your choice and uses it to dispatch messages, + * allowing dispatch to occur asynchronously. + * + * @author Cliff + * @since 10.0 + */ +class AsyncBus : Bus { + /** + * Creates a new AsyncMessageBus that will use `executor` to dispatch messages. Assigns `identifier` as the bus's name for logging purposes. + * + * @param identifier short name for the bus, for logging purposes. + * @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut + * down the executor after the last message has been posted to this message bus. + */ + constructor(identifier: String, executor: Executor) : super( + identifier, + executor, + Dispatcher.legacyAsync(), + LoggingHandler() + ) + + /** + * Creates a new AsyncMessageBus that will use `executor` to dispatch messages. + * + * @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut + * down the executor after the last message has been posted to this message bus. + * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers. + * See [SubscriberExceptionHandler] for more information. + * @since 16.0 + */ + constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler) : super( + "default", + executor, + Dispatcher.legacyAsync(), + subscriberExceptionHandler + ) + + /** + * Creates a new AsyncMessageBus that will use `executor` to dispatch messages. + * + * @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut + * down the executor after the last message has been posted to this message bus. + */ + constructor(executor: Executor) : super( + "default", + executor, + Dispatcher.legacyAsync(), + LoggingHandler() + ) +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Bus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Bus.kt new file mode 100644 index 0000000..bf36e15 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Bus.kt @@ -0,0 +1,254 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +import java.lang.reflect.Method +import java.util.* +import java.util.concurrent.Executor +import java.util.logging.Level +import java.util.logging.Logger + +/** + * Dispatches messages to listeners, and provides ways for listeners to register themselves. + * + *

Avoid MessageBus

+ * + * + * **We recommend against using MessageBus.** It was designed many years ago, and newer + * libraries offer better ways to decouple components and react to messages. + * + * + * To decouple components, we recommend a dependency-injection framework. For Android code, most + * apps use [Dagger](https://dagger.dev). For server code, common options include [Guice](https://github.com/google/guice/wiki/Motivation) and [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-introduction). + * Frameworks typically offer a way to register multiple listeners independently and then request + * them together as a set ([Dagger](https://dagger.dev/dev-guide/multibindings), [Guice](https://github.com/google/guice/wiki/Multibindings), [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-autowired-annotation)). + * + * + * To react to messages, we recommend a reactive-streams framework like [RxJava](https://github.com/ReactiveX/RxJava/wiki) (supplemented with its [RxAndroid](https://github.com/ReactiveX/RxAndroid) extension if you are building for + * Android) or [Project Reactor](https://projectreactor.io/). (For the basics of + * translating code from using an message bus to using a reactive-streams framework, see these two + * guides: [1](https://blog.jkl.gg/implementing-an-message-bus-with-rxjava-rxbus/), [2](https://lorentzos.com/rxjava-as-message-bus-the-right-way-10a36bdd49ba).) Some usages + * of MessageBus may be better written using [Kotlin coroutines](https://kotlinlang.org/docs/coroutines-guide.html), including [Flow](https://kotlinlang.org/docs/flow.html) and [Channels](https://kotlinlang.org/docs/channels.html). Yet other usages are better served + * by individual libraries that provide specialized support for particular use cases. + * + * + * Disadvantages of MessageBus include: + * + * + * * It makes the cross-references between producer and subscriber harder to find. This can + * complicate debugging, lead to unintentional reentrant calls, and force apps to eagerly + * initialize all possible subscribers at startup time. + * * It uses reflection in ways that break when code is processed by optimizers/minimizer like + * [R8 and Proguard](https://developer.android.com/studio/build/shrink-code). + * * It doesn't offer a way to wait for multiple messages before taking action. For example, it + * doesn't offer a way to wait for multiple producers to all report that they're "ready," nor + * does it offer a way to batch multiple messages from a single producer together. + * * It doesn't support backpressure and other features needed for resilience. + * * It doesn't provide much control of threading. + * * It doesn't offer much monitoring. + * * It doesn't propagate exceptions, so apps don't have a way to react to them. + * * It doesn't interoperate well with RxJava, coroutines, and other more commonly used + * alternatives. + * * It imposes requirements on the lifecycle of its subscribers. For example, if an message + * occurs between when one subscriber is removed and the next subscriber is added, the message + * is dropped. + * * Its performance is suboptimal, especially under Android. + * * It [doesn't support parameterized + * types](https://github.com/google/guava/issues/1431). + * * With the introduction of lambdas in Java 8, MessageBus went from less verbose than listeners + * to [more verbose](https://github.com/google/guava/issues/3311). + * + * + *

MessageBus Summary

+ * + * + * The MessageBus allows publish-subscribe-style communication between components without requiring + * the components to explicitly register with one another (and thus be aware of each other). It is + * designed exclusively to replace traditional Java in-process message distribution using explicit + * registration. It is *not* a general-purpose publish-subscribe system, nor is it intended + * for interprocess communication. + * + *

Receiving Messages

+ * + * + * To receive messages, an object should: + * + * + * 1. Expose a public method, known as the *message subscriber*, which accepts a single + * argument of the type of message desired; + * 1. Mark it with a [Subscribe] annotation; + * 1. Pass itself to an MessageBus instance's [.register] method. + * + * + *

Posting Messages

+ * + * + * To post an message, simply provide the message object to the [.post] method. The + * MessageBus instance will determine the type of message and route it to all registered listeners. + * + * + * Messages are routed based on their type an message will be delivered to any subscriber for + * any type to which the message is *assignable.* This includes implemented interfaces, all + * superclasses, and all interfaces implemented by superclasses. + * + * + * When `post` is called, all registered subscribers for an message are run in sequence, so + * subscribers should be reasonably quick. If an message may trigger an extended process (such as a + * database load), spawn a thread or queue it for later. (For a convenient way to do this, use an + * [AsyncBus].) + * + *

Subscriber Methods

+ * + * + * Message subscriber methods must accept only one argument: the message. + * + * + * Subscribers should not, in general, throw. If they do, the MessageBus will catch and log the + * exception. This is rarely the right solution for error handling and should not be relied upon; it + * is intended solely to help find problems during development. + * + * + * The MessageBus guarantees that it will not call a subscriber method from multiple threads + * simultaneously, unless the method explicitly allows it by bearing the [ ] annotation. If this annotation is not present, subscriber methods need not + * worry about being reentrant, unless also called from outside the MessageBus. + * + *

Dead Messages

+ * + * + * If an message is posted, but no registered subscribers can accept it, it is considered "dead." + * To give the system a second chance to handle dead messages, they are wrapped in an instance of + * [DeadMessage] and reposted. + * + * + * If a subscriber for a supertype of all messages (such as Object) is registered, no message will + * ever be considered dead, and no DeadMessages will be generated. Accordingly, while DeadMessage + * extends [Object], a subscriber registered to receive any Object will never receive a + * DeadMessage. + * + * + * This class is safe for concurrent use. + * + * + * See the Guava User Guide article on [`MessageBus`](https://github.com/google/guava/wiki/MessageBusExplained). + * + * @author Cliff + * @since 10.0 + * @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java + * @param executor the default executor this event bus uses for dispatching events to subscribers. + * @param dispatcher message dispatcher. + * @param exceptionHandler Handler for subscriber exceptions. + */ +open class Bus( + override val identifier: String, + override val executor: Executor, + val dispatcher: Dispatcher, + val exceptionHandler: SubscriberExceptionHandler, +) : IBus { + + private val subscribers: SubscriberRegistry = SubscriberRegistry(this) + + /** + * Creates a new MessageBus with the given `identifier`. + * + * @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java + * identifier. + */ + @JvmOverloads + constructor(identifier: String = "default") : this( + identifier, + Executor { it.run() }, + Dispatcher.perThreadDispatchQueue(), + LoggingHandler() + ) + + /** + * Creates a new MessageBus with the given [SubscriberExceptionHandler]. + * + * @param exceptionHandler Handler for subscriber exceptions. + * @since 16.0 + */ + constructor(exceptionHandler: SubscriberExceptionHandler) : this( + "default", + Executor { it.run() }, + Dispatcher.perThreadDispatchQueue(), + exceptionHandler + ) + + + /** + * Registers all subscriber methods on `object` to receive messages. + * + * @param subscriber object whose subscriber methods should be registered. + */ + override fun register(subscriber: Any) { + subscribers.register(subscriber) + } + + override fun register(subscriber: Any, method: Method) { + subscribers.register(subscriber, method) + } + + /** + * Unregisters all subscriber methods on a registered `object`. + * + * @param subscriber object whose subscriber methods should be unregistered. + * @throws IllegalArgumentException if the object was not previously registered. + */ + override fun unregister(subscriber: Any) { + subscribers.unregister(subscriber) + } + + /** + * Posts an message to all registered subscribers. This method will return successfully after the + * message has been posted to all subscribers, and regardless of any exceptions thrown by + * subscribers. + * + * + * If no subscribers have been subscribed for `message`'s class, and `message` is not + * already a [DeadMessage], it will be wrapped in a DeadMessage and reposted. + * + * @param message message to post. + */ + override fun publish(message: Any) { + val messageSubscribers = subscribers.getSubscribers(message) + if (messageSubscribers.hasNext()) { + dispatcher.dispatch(message, messageSubscribers) + } else if (message !is DeadMessage) { + // the message had no subscribers and was not itself a DeadMessage + publish(DeadMessage(this, message)) + } + } + + + /** Handles the given exception thrown by a subscriber with the given context. */ + override fun handleException(cause: Throwable?, context: SubscriberExceptionContext) { + try { + exceptionHandler.handleException(cause, context) + } catch (e2: Throwable) { + // if the handler threw an exception... well, just log it + logger.log( + Level.SEVERE, String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, cause), + e2 + ) + } + } + + override fun toString(): String { + return "Bus(identifier='$identifier', executor=$executor, dispatcher=$dispatcher, exceptionHandler=$exceptionHandler, subscribers=$subscribers)" + } + + companion object { + private val logger = Logger.getLogger(Bus::class.java.name) + } +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/DeadMessage.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/DeadMessage.kt new file mode 100644 index 0000000..1c044ac --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/DeadMessage.kt @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +/** + * Wraps an message that was posted, but which had no subscribers and thus could not be delivered. + * + * + * Registering a DeadMessage subscriber is useful for debugging or logging, as it can detect + * misconfigurations in a system's message distribution. + * + * @author Cliff + * @since 10.0 + * + * Creates a new DeadMessage. + * + * @param source object broadcasting the DeadMessage (generally the [Bus]). + * @param message the message that could not be delivered. + */ +class DeadMessage(val source: Any?, val message: Any?) { + + override fun toString(): String { + return "DeadMessage(source=$source, message=$message)" + } +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Dispatcher.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Dispatcher.kt new file mode 100644 index 0000000..daab5f6 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Dispatcher.kt @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2014 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +import java.util.* +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * Handler for dispatching messages to subscribers, providing different message ordering guarantees that + * make sense for different situations. + * + * + * **Note:** The dispatcher is orthogonal to the subscriber's `Executor`. The dispatcher + * controls the order in which messages are dispatched, while the executor controls how (i.e. on which + * thread) the subscriber is actually called when an message is dispatched to it. + * + * @author Colin Decker + */ +abstract class Dispatcher { + /** Dispatches the given `message` to the given `subscribers`. */ + abstract fun dispatch(message: T, subscribers: Iterator>?) + + /** Implementation of a [.perThreadDispatchQueue] dispatcher. */ + private class PerThreadQueuedDispatcher : Dispatcher() { + // This dispatcher matches the original dispatch behavior of MessageBus. + /** Per-thread queue of messages to dispatch. */ + private val queue: ThreadLocal>> = object : ThreadLocal>>() { + override fun initialValue(): Queue> { + return ArrayDeque() + } + } + + /** Per-thread dispatch state, used to avoid reentrant message dispatching. */ + private val dispatching: ThreadLocal = object : ThreadLocal() { + override fun initialValue(): Boolean { + return false + } + } + + override fun dispatch(message: T, subscribers: Iterator>?) { + val queueForThread = queue.get() + queueForThread.offer(Message(message, subscribers)) + if (!dispatching.get()) { + dispatching.set(true) + try { + var nextMessage: Message? + while (queueForThread.poll().also { nextMessage = it } != null) { + while (nextMessage!!.subscribers!!.hasNext()) { + nextMessage!!.subscribers!!.next().dispatchMessage(nextMessage!!.message) + } + } + } finally { + dispatching.remove() + queue.remove() + } + } + } + + private class Message(val message: Any, val subscribers: Iterator>?) + } + + /** Implementation of a [.legacyAsync] dispatcher. */ + private class LegacyAsyncDispatcher : Dispatcher() { + // This dispatcher matches the original dispatch behavior of AsyncMessageBus. + // + // We can't really make any guarantees about the overall dispatch order for this dispatcher in + // a multithreaded environment for a couple reasons: + // + // 1. Subscribers to messages posted on different threads can be interleaved with each other + // freely. (A message on one thread, B message on another could yield any of + // [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.) + // 2. It's possible for subscribers to actually be dispatched to in a different order than they + // were added to the queue. It's easily possible for one thread to take the head of the + // queue, immediately followed by another thread taking the next element in the queue. That + // second thread can then dispatch to the subscriber it took before the first thread does. + // + // All this makes me really wonder if there's any value in queueing here at all. A dispatcher + // that simply loops through the subscribers and dispatches the message to each would actually + // probably provide a stronger order guarantee, though that order would obviously be different + // in some cases. + /** Global message queue. */ + private val queue = ConcurrentLinkedQueue>() + override fun dispatch(message: T, subscribers: Iterator>?) { + while (subscribers!!.hasNext()) { + queue.add(MessageWithSubscriber(message, subscribers.next())) + } + var e: MessageWithSubscriber? + while (queue.poll().also { e = it } != null) { + e!!.subscriber!!.dispatchMessage(e!!.message) + } + } + + private class MessageWithSubscriber(val message: T, val subscriber: Subscriber?) + } + + /** Implementation of [.immediate]. */ + private class ImmediateDispatcher : Dispatcher() { + override fun dispatch(message: T, subscribers: Iterator>?) { + while (subscribers!!.hasNext()) { + subscribers.next().dispatchMessage(message) + } + } + } + + companion object { + /** + * Returns a dispatcher that queues messages that are posted reentrantly on a thread that is already + * dispatching an message, guaranteeing that all messages posted on a single thread are dispatched to + * all subscribers in the order they are posted. + * + * + * When all subscribers are dispatched to using a *direct* executor (which dispatches on + * the same thread that posts the message), this yields a breadth-first dispatch order on each + * thread. That is, all subscribers to a single message A will be called before any subscribers to + * any messages B and C that are posted to the message bus by the subscribers to A. + */ + fun perThreadDispatchQueue(): Dispatcher { + return PerThreadQueuedDispatcher() + } + + /** + * Returns a dispatcher that queues messages that are posted in a single global queue. This behavior + * matches the original behavior of AsyncMessageBus exactly, but is otherwise not especially useful. + * For async dispatch, an [immediate][.immediate] dispatcher should generally be + * preferable. + */ + fun legacyAsync(): Dispatcher { + return LegacyAsyncDispatcher() + } + + /** + * Returns a dispatcher that dispatches messages to subscribers immediately as they're posted + * without using an intermediate queue to change the dispatch order. This is effectively a + * depth-first dispatch order, vs. breadth-first when using a queue. + */ + fun immediate(): Dispatcher { + return ImmediateDispatcher() + } + } +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IBus.kt new file mode 100644 index 0000000..207b5e3 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IBus.kt @@ -0,0 +1,41 @@ +package com.synebula.gaea.bus + +import java.lang.reflect.Method +import java.util.concurrent.Executor + +interface IBus { + + val identifier: String + + val executor: Executor + + /** + * 注册事件Listener + * @param subscriber subscriber所在类 + */ + fun register(subscriber: Any) + + /** + * 取消注册事件Listener + * @param subscriber Listener所在类 + */ + fun register(subscriber: Any, method: Method) + + /** + * 取消注册事件Listener + * @param subscriber Listener所在类 + */ + fun unregister(subscriber: Any) + + /** + * 同步发布事件 + * @param message 事件 + */ + fun publish(message: T) + + + /** + * 异常处理 + */ + fun handleException(cause: Throwable?, context: SubscriberExceptionContext) +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IMessage.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IMessage.kt deleted file mode 100644 index e560295..0000000 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IMessage.kt +++ /dev/null @@ -1,3 +0,0 @@ -package com.synebula.gaea.bus - -interface IMessage \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IMessageBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IMessageBus.kt deleted file mode 100644 index 0f435e2..0000000 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IMessageBus.kt +++ /dev/null @@ -1,28 +0,0 @@ -package com.synebula.gaea.bus - -interface IMessageBus { - - /** - * 注册事件Listener - * @param obj Listener所在类 - */ - fun register(obj: Any) - - /** - * 取消注册事件Listener - * @param obj Listener所在类 - */ - fun unregister(obj: Any) - - /** - * 同步发布事件 - * @param message 事件 - */ - fun publish(message: IMessage) - - /** - * 异步发布事件 - * @param message 事件 - */ - fun publishAsync(message: IMessage) -} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/LoggingHandler.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/LoggingHandler.kt new file mode 100644 index 0000000..0cab54a --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/LoggingHandler.kt @@ -0,0 +1,34 @@ +package com.synebula.gaea.bus + +import java.util.logging.Level +import java.util.logging.Logger + + +/** Simple logging handler for subscriber exceptions. */ +internal class LoggingHandler : SubscriberExceptionHandler { + override fun handleException(exception: Throwable?, context: SubscriberExceptionContext) { + val logger = logger(context) + if (logger.isLoggable(Level.SEVERE)) { + logger.log(Level.SEVERE, message(context), exception) + } + } + + companion object { + private fun logger(context: SubscriberExceptionContext): Logger { + return Logger.getLogger(Bus::class.java.name + "." + context.bus.identifier) + } + + private fun message(context: SubscriberExceptionContext): String { + val method = context.subscriberMethod + return ("Exception thrown by subscriber method " + + method.name + + '(' + + method.parameterTypes[0].name + + ')' + + " on subscriber " + + context.subscriber + + " when dispatching message: " + + context.message) + } + } +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscribe.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscribe.kt new file mode 100644 index 0000000..2a7ab0d --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscribe.kt @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +/** + * Marks a method as a message subscriber. + * + * + * The type of message will be indicated by the method's first (and only) parameter, which cannot + * be primitive. If this annotation is applied to methods with zero parameters, or more than one + * parameter, the object containing the method will not be able to register for message delivery from + * the [Bus]. + * + * + * Unless also annotated with @[AllowConcurrentSubscribe], message subscriber methods will be + * invoked serially by each message bus that they are registered with. + * + * @author Cliff + * @since 10.0 + * + * @param topics method subscribe topics, only for [com.synebula.gaea.bus.messagebus.MessageBus] + */ +@Retention(AnnotationRetention.RUNTIME) +@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER) +annotation class Subscribe(val topics: Array = []) \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscriber.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscriber.kt new file mode 100644 index 0000000..c66d422 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscriber.kt @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2014 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +import java.lang.reflect.InvocationTargetException +import java.lang.reflect.Method +import java.util.concurrent.Executor + +/** + * A subscriber method on a specific object, plus the executor that should be used for dispatching + * messages to it. + * + * + * Two subscribers are equivalent when they refer to the same method on the same object (not + * class). This property is used to ensure that no subscriber method is registered more than once. + * + * @author Colin Decker + * + * @param bus The message bus this subscriber belongs to. + * @param target The object with the subscriber method. + * @param method Subscriber method. + */ +open class Subscriber private constructor( + private val bus: IBus, + val target: Any, + val method: Method, +) { + /** Executor to use for dispatching messages to this subscriber. */ + private val executor: Executor? + + init { + method.isAccessible = true + executor = bus.executor + } + + /** Dispatches `message` to this subscriber using the proper executor. */ + fun dispatchMessage(message: Any) { + executor!!.execute { + try { + invokeSubscriberMethod(message) + } catch (e: InvocationTargetException) { + bus.handleException(e.cause, context(message)) + } + } + } + + /** + * Invokes the subscriber method. This method can be overridden to make the invocation + * synchronized. + */ + @Throws(InvocationTargetException::class) + open fun invokeSubscriberMethod(message: Any) { + try { + method.invoke(target, message) + } catch (e: IllegalArgumentException) { + throw Error("Method rejected target/argument: $message", e) + } catch (e: IllegalAccessException) { + throw Error("Method became inaccessible: $message", e) + } catch (e: InvocationTargetException) { + if (e.cause is Error) { + throw (e.cause as Error?)!! + } + throw e + } + } + + /** Gets the context for the given message. */ + private fun context(message: Any): SubscriberExceptionContext { + return SubscriberExceptionContext(bus, message, target, method) + } + + override fun hashCode(): Int { + return (31 + method.hashCode()) * 31 + System.identityHashCode(target) + } + + override fun equals(other: Any?): Boolean { + if (other is Subscriber<*>) { + // Use == so that different equal instances will still receive messages. + // We only guard against the case that the same object is registered + // multiple times + return target === other.target && method == other.method + } + return false + } + + /** + * Subscriber that synchronizes invocations of a method to ensure that only one thread may enter + * the method at a time. + */ + internal class SynchronizedSubscriber(bus: IBus, target: Any, method: Method) : + Subscriber(bus, target, method) { + @Throws(InvocationTargetException::class) + override fun invokeSubscriberMethod(message: Any) { + synchronized(this) { super.invokeSubscriberMethod(message) } + } + } + + companion object { + /** Creates a `Subscriber` for `method` on `subscriber`. */ + fun create(bus: IBus, subscriber: Any, method: Method): Subscriber { + return if (isDeclaredThreadSafe(method)) Subscriber( + bus, + subscriber, + method + ) else SynchronizedSubscriber( + bus, subscriber, method + ) + } + + /** + * Checks whether `method` is thread-safe, as indicated by the presence of the [ ] annotation. + */ + private fun isDeclaredThreadSafe(method: Method): Boolean { + return method.getAnnotation(AllowConcurrentSubscribe::class.java) != null + } + } +} diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberExceptionContext.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberExceptionContext.kt new file mode 100644 index 0000000..d0b9bb4 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberExceptionContext.kt @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2013 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +import java.lang.reflect.Method + +/** + * Context for an exception thrown by a subscriber. + * + * @since 16.0 + * + * @param bus The [IBus] that handled the message and the subscriber. Useful for + * broadcasting a new message based on the error. + * @param message The message object that caused the subscriber to throw. + * @param subscriber The source subscriber context. + * @param subscriberMethod the subscribed method. + */ +class SubscriberExceptionContext internal constructor( + val bus: IBus, val message: Any, val subscriber: Any, val subscriberMethod: Method, +) \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberExceptionHandler.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberExceptionHandler.kt new file mode 100644 index 0000000..14c40f2 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberExceptionHandler.kt @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2013 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +/** + * Handler for exceptions thrown by message subscribers. + * + * @since 16.0 + */ +interface SubscriberExceptionHandler { + /** Handles exceptions thrown by subscribers. */ + fun handleException(exception: Throwable?, context: SubscriberExceptionContext) +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberRegistry.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberRegistry.kt new file mode 100644 index 0000000..05138f0 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberRegistry.kt @@ -0,0 +1,193 @@ +/* + * Copyright (C) 2014 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus + +import com.synebula.gaea.reflect.Types +import java.lang.reflect.Method +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArraySet + +/** + * Registry of subscribers to a single message bus. + * + * @author Colin Decker + */ +open class SubscriberRegistry(private val bus: IBus) { + /** + * All registered subscribers, indexed by message type. + * + * + * The [CopyOnWriteArraySet] values make it easy and relatively lightweight to get an + * immutable snapshot of all current subscribers to a message without any locking. + */ + private val subscribers = ConcurrentHashMap, CopyOnWriteArraySet>>() + + + /** Registers all subscriber methods on the given subscriber object. */ + open fun register(subscriber: Any) { + val listenerMethods = findAllSubscribers(subscriber) + for ((eventType, messageMethodsInListener) in listenerMethods) { + var messageSubscribers = subscribers[eventType] + if (messageSubscribers == null) { + val newSet = CopyOnWriteArraySet>() + messageSubscribers = subscribers.putIfAbsent(eventType, newSet) ?: newSet + } + messageSubscribers.addAll(messageMethodsInListener) + } + } + + /** Registers subscriber method on the given subscriber object. */ + open fun register(subscriber: Any, method: Method) { + val parameterTypes = method.parameterTypes + val eventType = parameterTypes[0] + check(parameterTypes.size == 1) { + "Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter." + } + check(!parameterTypes[0].isPrimitive) { + "@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. " + } + var messageSubscribers = subscribers[eventType] + if (messageSubscribers == null) { + val newSet = CopyOnWriteArraySet>() + messageSubscribers = subscribers.putIfAbsent(eventType, newSet) ?: newSet + } + messageSubscribers.add(Subscriber.create(bus, subscriber, method)) + } + + + /** Unregisters all subscribers on the given subscriber object. */ + open fun unregister(subscriber: Any) { + val listenerMethods = findAllSubscribers(subscriber) + for ((eventType, listenerMethodsForType) in listenerMethods) { + val currentSubscribers = subscribers[eventType] + require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType.toSet()))) { + // if removeAll returns true, all we really know is that at least one subscriber was + // removed... however, barring something very strange we can assume that if at least one + // subscriber was removed, all subscribers on subscriber for that message type were... after + // all, the definition of subscribers on a particular class is totally static + "missing message subscriber for an annotated method. Is $subscriber registered?" + } + + // don't try to remove the set if it's empty; that can't be done safely without a lock + // anyway, if the set is empty it'll just be wrapping an array of length 0 + } + } + + /** + * Gets an iterator representing an immutable snapshot of all subscribers to the given message at + * the time this method is called. + */ + fun getSubscribers(eventType: Any): Iterator> { + val eventSubscribers: CopyOnWriteArraySet> = + subscribers[eventType.javaClass] ?: CopyOnWriteArraySet() + return eventSubscribers.iterator() + } + + /** + * Returns all subscribers for the given subscriber grouped by the type of message they subscribe to. + */ + private fun findAllSubscribers(subscriber: Any): Map, List>> { + val methodsInListener = mutableMapOf, List>>() + val clazz: Class<*> = subscriber.javaClass + for (method in getAnnotatedMethods(clazz)) { + val parameterTypes = method.parameterTypes + val eventType = parameterTypes[0] + val methods = methodsInListener[eventType]?.toMutableList() ?: mutableListOf() + methods.add(Subscriber.create(bus, subscriber, method)) + methodsInListener[eventType] = methods + } + return methodsInListener.toMap() + } + + + class MethodIdentifier(method: Method) { + private val name: String = method.name + private val parameterTypes: List> = listOf(*method.parameterTypes) + + override fun hashCode(): Int { + var result = name.hashCode() + for (element in parameterTypes) result = 31 * result + element.hashCode() + return result + } + + override fun equals(other: Any?): Boolean { + if (other is MethodIdentifier) { + return name == other.name && parameterTypes == other.parameterTypes + } + return false + } + } + + /** + * A thread-safe cache that contains the mapping from each class to all methods in that class and + * all super-classes, that are annotated with `@Subscribe`. The cache is shared across all + * instances of this class; this greatly improves performance if multiple MessageBus instances are + * created and objects of the same class are registered on all of them. + */ + protected val subscriberMethodsCache = mapOf, List>() + + @Synchronized + protected fun getAnnotatedMethods(clazz: Class<*>): List { + var methods = subscriberMethodsCache[clazz] + if (methods == null) + methods = getAnnotatedMethodsNotCached(clazz, Subscribe::class.java) + return methods + } + + protected fun getAnnotatedMethodsNotCached( + clazz: Class<*>, + annotationClass: Class, + ): List { + val supertypes = flattenHierarchy(clazz) + val identifiers = mutableMapOf() + for (supertype in supertypes) { + for (method in supertype.declaredMethods) { + if (method.isAnnotationPresent(annotationClass) && !method.isSynthetic) { + val parameterTypes = method.parameterTypes + check(parameterTypes.size == 1) { + "Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter." + } + check(!parameterTypes[0].isPrimitive) { + "@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. " + } + + val identifier = MethodIdentifier(method) + if (!identifiers.containsKey(identifier)) { + identifiers[identifier] = method + } + } + } + } + return identifiers.values.toList() + } + + companion object { + + /** Global cache of classes to their flattened hierarchy of supertypes. */ + protected val flattenHierarchyCache = mutableMapOf, Set>>() + + /** + * Flattens a class's type hierarchy into a set of `Class` objects including all + * superclasses (transitively) and all interfaces implemented by these superclasses. + */ + fun flattenHierarchy(clazz: Class<*>): Set> { + var supertypes = flattenHierarchyCache[clazz] + if (supertypes == null) { + supertypes = Types.supertypes(clazz) + flattenHierarchyCache[clazz] = supertypes + } + return supertypes + } + } +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/AsyncMessageBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/AsyncMessageBus.kt new file mode 100644 index 0000000..30cb8b5 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/AsyncMessageBus.kt @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus.messagebus + +import com.synebula.gaea.bus.Dispatcher +import com.synebula.gaea.bus.LoggingHandler +import com.synebula.gaea.bus.SubscriberExceptionHandler +import java.util.concurrent.Executor + +/** + * An [MessageBus] that takes the Executor of your choice and uses it to dispatch events, + * allowing dispatch to occur asynchronously. + * + * @author Cliff + * @since 10.0 + */ +class AsyncMessageBus : MessageBus { + /** + * Creates a new AsyncEventBus that will use `executor` to dispatch events. Assigns `identifier` as the bus's name for logging purposes. + * + * @param identifier short name for the bus, for logging purposes. + * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut + * down the executor after the last event has been posted to this event bus. + */ + constructor(identifier: String, executor: Executor) : super( + identifier, + executor, + Dispatcher.legacyAsync(), + LoggingHandler() + ) + + /** + * Creates a new AsyncEventBus that will use `executor` to dispatch events. + * + * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut + * down the executor after the last event has been posted to this event bus. + * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers. + * See [SubscriberExceptionHandler] for more information. + * @since 16.0 + */ + constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler) : super( + "default", + executor, + Dispatcher.legacyAsync(), + subscriberExceptionHandler + ) + + /** + * Creates a new AsyncEventBus that will use `executor` to dispatch events. + * + * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut + * down the executor after the last event has been posted to this event bus. + */ + constructor(executor: Executor) : super( + "default", + executor, + Dispatcher.legacyAsync(), + LoggingHandler() + ) +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/IMessageBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/IMessageBus.kt new file mode 100644 index 0000000..da731fb --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/IMessageBus.kt @@ -0,0 +1,39 @@ +package com.synebula.gaea.bus.messagebus + +import com.synebula.gaea.bus.IBus +import java.lang.reflect.Method + +interface IMessageBus : IBus { + + /** + * 注册事件Listener + * @param topics 主题 + * @param subscriber subscriber对象 + */ + fun register(topics: Array, subscriber: Any) + + /** + * 注册事件Listener + * @param topics 主题 + * @param subscriber subscriber对象 + * @param method Listener方法 + */ + fun register(topics: Array, subscriber: Any, method: Method) + + + /** + * 取消注册事件Listener + * @param topic 主题 + * @param subscriber subscriber对象 + */ + fun unregister(topic: String, subscriber: Any) + + + /** + * 同步发布事件 + * @param topic 主题 + * @param message 事件 + */ + fun publish(topic: String, message: T) + +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/MessageBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/MessageBus.kt new file mode 100644 index 0000000..db6a4aa --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/MessageBus.kt @@ -0,0 +1,280 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus.messagebus + +import com.synebula.gaea.bus.* +import java.lang.reflect.Method +import java.util.* +import java.util.concurrent.Executor +import java.util.logging.Level +import java.util.logging.Logger + +/** + * Dispatches events to listeners, and provides ways for listeners to register themselves. + * + *

Avoid EventBus

+ * + * + * **We recommend against using EventBus.** It was designed many years ago, and newer + * libraries offer better ways to decouple components and react to events. + * + * + * To decouple components, we recommend a dependency-injection framework. For Android code, most + * apps use [Dagger](https://dagger.dev). For server code, common options include [Guice](https://github.com/google/guice/wiki/Motivation) and [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-introduction). + * Frameworks typically offer a way to register multiple listeners independently and then request + * them together as a set ([Dagger](https://dagger.dev/dev-guide/multibindings), [Guice](https://github.com/google/guice/wiki/Multibindings), [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-autowired-annotation)). + * + * + * To react to events, we recommend a reactive-streams framework like [RxJava](https://github.com/ReactiveX/RxJava/wiki) (supplemented with its [RxAndroid](https://github.com/ReactiveX/RxAndroid) extension if you are building for + * Android) or [Project Reactor](https://projectreactor.io/). (For the basics of + * translating code from using an event bus to using a reactive-streams framework, see these two + * guides: [1](https://blog.jkl.gg/implementing-an-event-bus-with-rxjava-rxbus/), [2](https://lorentzos.com/rxjava-as-event-bus-the-right-way-10a36bdd49ba).) Some usages + * of EventBus may be better written using [Kotlin coroutines](https://kotlinlang.org/docs/coroutines-guide.html), including [Flow](https://kotlinlang.org/docs/flow.html) and [Channels](https://kotlinlang.org/docs/channels.html). Yet other usages are better served + * by individual libraries that provide specialized support for particular use cases. + * + * + * Disadvantages of EventBus include: + * + * + * * It makes the cross-references between producer and subscriber harder to find. This can + * complicate debugging, lead to unintentional reentrant calls, and force apps to eagerly + * initialize all possible subscribers at startup time. + * * It uses reflection in ways that break when code is processed by optimizers/minimizer like + * [R8 and Proguard](https://developer.android.com/studio/build/shrink-code). + * * It doesn't offer a way to wait for multiple events before taking action. For example, it + * doesn't offer a way to wait for multiple producers to all report that they're "ready," nor + * does it offer a way to batch multiple events from a single producer together. + * * It doesn't support backpressure and other features needed for resilience. + * * It doesn't provide much control of threading. + * * It doesn't offer much monitoring. + * * It doesn't propagate exceptions, so apps don't have a way to react to them. + * * It doesn't interoperate well with RxJava, coroutines, and other more commonly used + * alternatives. + * * It imposes requirements on the lifecycle of its subscribers. For example, if an event + * occurs between when one subscriber is removed and the next subscriber is added, the event + * is dropped. + * * Its performance is suboptimal, especially under Android. + * * It [doesn't support parameterized + * types](https://github.com/google/guava/issues/1431). + * * With the introduction of lambdas in Java 8, EventBus went from less verbose than listeners + * to [more verbose](https://github.com/google/guava/issues/3311). + * + * + *

EventBus Summary

+ * + * + * The EventBus allows publish-subscribe-style communication between components without requiring + * the components to explicitly register with one another (and thus be aware of each other). It is + * designed exclusively to replace traditional Java in-process event distribution using explicit + * registration. It is *not* a general-purpose publish-subscribe system, nor is it intended + * for interprocess communication. + * + *

Receiving Events

+ * + * + * To receive events, an object should: + * + * + * 1. Expose a public method, known as the *event subscriber*, which accepts a single + * argument of the type of event desired; + * 1. Mark it with a [Subscribe] annotation; + * 1. Pass itself to an EventBus instance's [.register] method. + * + * + *

Posting Events

+ * + * + * To post an event, simply provide the event object to the [.post] method. The + * EventBus instance will determine the type of event and route it to all registered listeners. + * + * + * Events are routed based on their type an event will be delivered to any subscriber for + * any type to which the event is *assignable.* This includes implemented interfaces, all + * superclasses, and all interfaces implemented by superclasses. + * + * + * When `post` is called, all registered subscribers for an event are run in sequence, so + * subscribers should be reasonably quick. If an event may trigger an extended process (such as a + * database load), spawn a thread or queue it for later. (For a convenient way to do this, use an + * [AsyncMessageBus].) + * + *

Subscriber Methods

+ * + * + * Event subscriber methods must accept only one argument: the event. + * + * + * Subscribers should not, in general, throw. If they do, the EventBus will catch and log the + * exception. This is rarely the right solution for error handling and should not be relied upon; it + * is intended solely to help find problems during development. + * + * + * The EventBus guarantees that it will not call a subscriber method from multiple threads + * simultaneously, unless the method explicitly allows it by bearing the [ ] annotation. If this annotation is not present, subscriber methods need not + * worry about being reentrant, unless also called from outside the EventBus. + * + *

Dead Events

+ * + * + * If an event is posted, but no registered subscribers can accept it, it is considered "dead." + * To give the system a second chance to handle dead events, they are wrapped in an instance of + * [DeadMessage] and reposted. + * + * + * If a subscriber for a supertype of all events (such as Object) is registered, no event will + * ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadMessage + * extends [Object], a subscriber registered to receive any Object will never receive a + * DeadMessage. + * + * + * This class is safe for concurrent use. + * + * + * See the Guava User Guide article on [`EventBus`](https://github.com/google/guava/wiki/EventBusExplained). + * + * @author Cliff + * @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java + * @param executor the default executor this event bus uses for dispatching events to subscribers. + * @param dispatcher message dispatcher. + * @param exceptionHandler Handler for subscriber exceptions. + */ +open class MessageBus internal constructor( + override val identifier: String, + override val executor: Executor, + val dispatcher: Dispatcher, + val exceptionHandler: SubscriberExceptionHandler, +) : IMessageBus { + val DEAD_TOPIC = "DEAD_TOPIC" + + val subscribers = TopicSubscriberRegistry(this) + + /** + * Creates a new EventBus with the given `identifier`. + * + * @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java + * identifier. + */ + @JvmOverloads + constructor(identifier: String = "default") : this( + identifier, + Executor { it.run() }, + Dispatcher.perThreadDispatchQueue(), + LoggingHandler() + ) + + /** + * Creates a new EventBus with the given [SubscriberExceptionHandler]. + * + * @param exceptionHandler Handler for subscriber exceptions. + * @since 16.0 + */ + constructor(exceptionHandler: SubscriberExceptionHandler) : this( + "default", + Executor { it.run() }, + Dispatcher.perThreadDispatchQueue(), + exceptionHandler + ) + + override fun register(topics: Array, subscriber: Any) { + subscribers.register(topics, subscriber) + } + + /** + * Registers subscriber method on `object` to receive messages. + * + * @param topics method subscribe topic. + * @param subscriber subscriber method declare object. + * @param method subscriber method should be registered. + */ + override fun register(topics: Array, subscriber: Any, method: Method) { + subscribers.register(topics, subscriber, method) + } + + + /** + * Registers all subscriber methods on `object` to receive messages. + * + * @param subscriber object whose subscriber methods should be registered. + */ + override fun register(subscriber: Any) { + subscribers.register(subscriber) + } + + override fun register(subscriber: Any, method: Method) { + subscribers.register(subscriber, method) + } + + /** + * Unregisters all subscriber methods on a registered `object`. + * + * @param subscriber object whose subscriber methods should be unregistered. + * @throws IllegalArgumentException if the object was not previously registered. + */ + override fun unregister(subscriber: Any) { + subscribers.unregister(subscriber) + } + + override fun unregister(topic: String, subscriber: Any) { + subscribers.unregister(topic, subscriber) + } + + /** + * Posts an message to all registered subscribers. This method will return successfully after the + * message has been posted to all subscribers, and regardless of any exceptions thrown by + * subscribers. + * + * @param message message to post. + */ + override fun publish(message: T) { + val messageSubscribers = subscribers.getSubscribers(message.javaClass.name) + if (messageSubscribers.hasNext()) { + dispatcher.dispatch(message, messageSubscribers) + } else { + // the message had no subscribers and was not itself a DeadMessage + publish(DEAD_TOPIC, message) + } + } + + override fun publish(topic: String, message: T) { + val messageSubscribers = subscribers.getSubscribers(topic) + if (messageSubscribers.hasNext()) { + dispatcher.dispatch(message, messageSubscribers) + } else if (topic != DEAD_TOPIC) { + // the message had no subscribers and was not itself a DeadMessage + publish(DEAD_TOPIC, message) + } + } + + + /** Handles the given exception thrown by a subscriber with the given context. */ + override fun handleException(cause: Throwable?, context: SubscriberExceptionContext) { + try { + exceptionHandler.handleException(cause, context) + } catch (e2: Throwable) { + // if the handler threw an exception... well, just log it + logger.log( + Level.SEVERE, String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, cause), + e2 + ) + } + } + + override fun toString(): String { + return "MessageBus(identifier='$identifier', executor=$executor, dispatcher=$dispatcher, exceptionHandler=$exceptionHandler, subscribers=$subscribers)" + } + + companion object { + private val logger = Logger.getLogger(Bus::class.java.name) + } +} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/TopicSubscriberRegistry.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/TopicSubscriberRegistry.kt new file mode 100644 index 0000000..6c8d0f7 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/TopicSubscriberRegistry.kt @@ -0,0 +1,184 @@ +/* + * Copyright (C) 2014 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.synebula.gaea.bus.messagebus + +import com.synebula.gaea.bus.Subscribe +import com.synebula.gaea.bus.Subscriber +import com.synebula.gaea.bus.SubscriberRegistry +import java.lang.reflect.Method +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArraySet + +/** + * Registry of subscribers to a single message bus. + * + * @author Colin Decker + */ +open class TopicSubscriberRegistry(private val bus: IMessageBus) : SubscriberRegistry(bus) { + /** + * All registered subscribers, indexed by message type. + * + * + * The [CopyOnWriteArraySet] values make it easy and relatively lightweight to get an + * immutable snapshot of all current subscribers to an message without any locking. + */ + private val subscribers = ConcurrentHashMap>>() + + open fun register(topics: Array, subscriber: Any) { + val listenerMethods = findAllSubscribers(subscriber) + for (topic in topics) { + for ((_, messageMethodsInListener) in listenerMethods) { + var messageSubscribers = subscribers[topic] + if (messageSubscribers == null) { + val newSet = CopyOnWriteArraySet>() + messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet + } + messageSubscribers.addAll(messageMethodsInListener) + } + } + } + + open fun register(topics: Array, subscriber: Any, method: Method) { + for (topic in topics) { + var messageSubscribers = subscribers[topic] + if (messageSubscribers == null) { + val newSet = CopyOnWriteArraySet>() + messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet + } + messageSubscribers.add(Subscriber.create(this.bus, subscriber, method)) + } + } + + /** Registers all subscriber methods on the given subscriber object. */ + override fun register(subscriber: Any) { + val listenerMethods = findAllSubscribers(subscriber) + for ((topic, messageMethodsInListener) in listenerMethods) { + var messageSubscribers = subscribers[topic] + if (messageSubscribers == null) { + val newSet = CopyOnWriteArraySet>() + messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet + } + messageSubscribers.addAll(messageMethodsInListener) + } + } + + /** Registers subscriber method on the given subscriber object. */ + override fun register(subscriber: Any, method: Method) { + val parameterTypes = method.parameterTypes + check(parameterTypes.size == 1) { + "Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter." + } + check(!parameterTypes[0].isPrimitive) { + "@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. " + } + + val eventType = parameterTypes[0] + val topic = eventType.name + var messageSubscribers = subscribers[topic] + if (messageSubscribers == null) { + val newSet = CopyOnWriteArraySet>() + messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet + } + messageSubscribers.add(Subscriber.create(bus, subscriber, method)) + } + + /** Unregisters all subscribers on the given subscriber object. */ + override fun unregister(subscriber: Any) { + val listenerMethods = findAllSubscribers(subscriber) + for ((topic, listenerMethodsForType) in listenerMethods) { + val currentSubscribers = subscribers[topic] + require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType.toSet()))) { + // if removeAll returns true, all we really know is that at least one subscriber was + // removed... however, barring something very strange we can assume that if at least one + // subscriber was removed, all subscribers on subscriber for that message type were... after + // all, the definition of subscribers on a particular class is totally static + "missing message subscriber for an annotated method. Is $subscriber registered?" + } + + // don't try to remove the set if it's empty; that can't be done safely without a lock + // anyway, if the set is empty it'll just be wrapping an array of length 0 + } + } + + /** Unregisters all subscribers by topic on the given subscriber object. */ + open fun unregister(topic: String, subscriber: Any) { + val listenerMethods = findAllSubscribers(subscriber) + if (listenerMethods[topic] == null) return //不包含topic则推出 + val currentSubscribers = subscribers[topic] + require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethods[topic]!!.toSet()))) { + // if removeAll returns true, all we really know is that at least one subscriber was + // removed... however, barring something very strange we can assume that if at least one + // subscriber was removed, all subscribers on subscriber for that message type were... after + // all, the definition of subscribers on a particular class is totally static + "missing message subscriber for an annotated method. Is $subscriber registered?" + } + + // don't try to remove the set if it's empty; that can't be done safely without a lock + // anyway, if the set is empty it'll just be wrapping an array of length 0 + } + + + /** + * Gets an iterator representing an immutable snapshot of all subscribers to the given message at + * the time this method is called. + */ + fun getSubscribers(topic: String): Iterator> { + val topicSubscribers: CopyOnWriteArraySet> = subscribers[topic] ?: CopyOnWriteArraySet() + return topicSubscribers.iterator() + } + + /** + * Returns all subscribers for the given subscriber grouped by the type of message they subscribe to. + */ + private fun findAllSubscribers(subscriber: Any): Map>> { + val methodsInListener = mutableMapOf>>() + val clazz: Class<*> = subscriber.javaClass + for (method in getAnnotatedMethods(clazz)) { + var topics = method.getAnnotation(Subscribe::class.java).topics + + //如果没有定义topic,则使用消息类名称做topic + if (topics.isEmpty()) { + val parameterTypes = method.parameterTypes + val messageType = parameterTypes[0] + topics = arrayOf(messageType.name) + } + for (topic in topics) { + val listeners = methodsInListener.getOrDefault(topic, mutableListOf()) + listeners.add(Subscriber.create(bus, subscriber, method)) + methodsInListener[topic] = listeners + } + } + return methodsInListener + } + + + class MethodIdentifier(method: Method) { + private val name: String = method.name + private val parameterTypes: List> = listOf(*method.parameterTypes) + + override fun hashCode(): Int { + var result = name.hashCode() + for (element in parameterTypes) result = 31 * result + element.hashCode() + return result + } + + override fun equals(other: Any?): Boolean { + if (other is MethodIdentifier) { + return name == other.name && parameterTypes == other.parameterTypes + } + return false + } + } + +} \ No newline at end of file diff --git a/src/gaea/src/test/kotlin/com/synebula/gaea/bus/BusTest.kt b/src/gaea/src/test/kotlin/com/synebula/gaea/bus/BusTest.kt new file mode 100644 index 0000000..0022320 --- /dev/null +++ b/src/gaea/src/test/kotlin/com/synebula/gaea/bus/BusTest.kt @@ -0,0 +1,94 @@ +package com.synebula.gaea.bus + +import com.synebula.gaea.bus.messagebus.AsyncMessageBus +import com.synebula.gaea.bus.messagebus.IMessageBus +import com.synebula.gaea.bus.messagebus.MessageBus +import org.junit.Test +import java.util.concurrent.Executors + + +class BusTest { + + @Test + fun testBus() { + val bus: IBus = Bus() + val subscriber = TestSubscriber() + bus.register(subscriber) + bus.publish("Hello world") + bus.publish(subscriber) + bus.publish(1) + } + + @Test + fun testAsyncBus() { + val bus: IBus = AsyncBus(Executors.newFixedThreadPool(10)) + val subscriber = TestSubscriber() + bus.register(subscriber, subscriber.javaClass.declaredMethods[0]) + bus.register(subscriber, subscriber.javaClass.declaredMethods[1]) + bus.publish("Hello world") + bus.publish(subscriber) + } + + @Test + fun testMessageBus() { + val bus: IMessageBus = MessageBus() + val subscriber = TestTopicSubscriber() + bus.register(subscriber) + bus.publish("hello", "Hello world") + bus.publish("whoami", subscriber) + } + + @Test + fun testMessageBus2() { + val bus: IMessageBus = AsyncMessageBus(Executors.newFixedThreadPool(10)) + val subscriber = TestTopicSubscriber() + val subscriber2 = TestTopicSubscriber2() + bus.register(subscriber) + bus.register(subscriber2) + bus.publish("hello", "Hello world") + bus.publish("whoami", subscriber) + + bus.unregister(subscriber2) + bus.publish("hello", "Hello world") + bus.publish("whoami", subscriber) + } + + internal class TestSubscriber { + @Subscribe + fun echo(name: String) { + println(name) + } + + @Subscribe + fun whoAmI(testSubscriber: TestSubscriber) { + println(testSubscriber.javaClass.name) + } + + } + + internal class TestTopicSubscriber { + @Subscribe(topics = ["hello"]) + fun echo(name: String) { + println(name) + } + + @Subscribe(topics = ["whoami"]) + fun whoAmI(testSubscriber: TestTopicSubscriber) { + println(testSubscriber.javaClass.name) + } + + } + + internal class TestTopicSubscriber2 { + @Subscribe(topics = ["hello"]) + fun echo(name: String) { + println("2 $name") + } + + @Subscribe(topics = ["whoami"]) + fun whoAmI(testSubscriber: TestTopicSubscriber) { + println("2 ${testSubscriber.javaClass.name}") + } + + } +} \ No newline at end of file