add bus code

This commit is contained in:
2022-07-27 10:40:24 +08:00
parent 072c48f888
commit 7692fb502b
19 changed files with 1692 additions and 31 deletions

View File

@@ -0,0 +1,28 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
/**
* Marks an message subscriber method as being thread-safe. This annotation indicates that MessageBus
* may invoke the message subscriber simultaneously from multiple threads.
*
*
* This does not mark the method, and so should be used in combination with [Subscribe].
*
* @author Cliff
* @since 10.0
*/
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER)
annotation class AllowConcurrentSubscribe

View File

@@ -0,0 +1,68 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
import java.util.concurrent.Executor
/**
* An [Bus] that takes the Executor of your choice and uses it to dispatch messages,
* allowing dispatch to occur asynchronously.
*
* @author Cliff
* @since 10.0
*/
class AsyncBus : Bus {
/**
* Creates a new AsyncMessageBus that will use `executor` to dispatch messages. Assigns `identifier` as the bus's name for logging purposes.
*
* @param identifier short name for the bus, for logging purposes.
* @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut
* down the executor after the last message has been posted to this message bus.
*/
constructor(identifier: String, executor: Executor) : super(
identifier,
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
/**
* Creates a new AsyncMessageBus that will use `executor` to dispatch messages.
*
* @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut
* down the executor after the last message has been posted to this message bus.
* @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
* See [SubscriberExceptionHandler] for more information.
* @since 16.0
*/
constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler<Any>) : super(
"default",
executor,
Dispatcher.legacyAsync(),
subscriberExceptionHandler
)
/**
* Creates a new AsyncMessageBus that will use `executor` to dispatch messages.
*
* @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut
* down the executor after the last message has been posted to this message bus.
*/
constructor(executor: Executor) : super(
"default",
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
}

View File

@@ -0,0 +1,254 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
import java.lang.reflect.Method
import java.util.*
import java.util.concurrent.Executor
import java.util.logging.Level
import java.util.logging.Logger
/**
* Dispatches messages to listeners, and provides ways for listeners to register themselves.
*
* <h2>Avoid MessageBus</h2>
*
*
* **We recommend against using MessageBus.** It was designed many years ago, and newer
* libraries offer better ways to decouple components and react to messages.
*
*
* To decouple components, we recommend a dependency-injection framework. For Android code, most
* apps use [Dagger](https://dagger.dev). For server code, common options include [Guice](https://github.com/google/guice/wiki/Motivation) and [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-introduction).
* Frameworks typically offer a way to register multiple listeners independently and then request
* them together as a set ([Dagger](https://dagger.dev/dev-guide/multibindings), [Guice](https://github.com/google/guice/wiki/Multibindings), [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-autowired-annotation)).
*
*
* To react to messages, we recommend a reactive-streams framework like [RxJava](https://github.com/ReactiveX/RxJava/wiki) (supplemented with its [RxAndroid](https://github.com/ReactiveX/RxAndroid) extension if you are building for
* Android) or [Project Reactor](https://projectreactor.io/). (For the basics of
* translating code from using an message bus to using a reactive-streams framework, see these two
* guides: [1](https://blog.jkl.gg/implementing-an-message-bus-with-rxjava-rxbus/), [2](https://lorentzos.com/rxjava-as-message-bus-the-right-way-10a36bdd49ba).) Some usages
* of MessageBus may be better written using [Kotlin coroutines](https://kotlinlang.org/docs/coroutines-guide.html), including [Flow](https://kotlinlang.org/docs/flow.html) and [Channels](https://kotlinlang.org/docs/channels.html). Yet other usages are better served
* by individual libraries that provide specialized support for particular use cases.
*
*
* Disadvantages of MessageBus include:
*
*
* * It makes the cross-references between producer and subscriber harder to find. This can
* complicate debugging, lead to unintentional reentrant calls, and force apps to eagerly
* initialize all possible subscribers at startup time.
* * It uses reflection in ways that break when code is processed by optimizers/minimizer like
* [R8 and Proguard](https://developer.android.com/studio/build/shrink-code).
* * It doesn't offer a way to wait for multiple messages before taking action. For example, it
* doesn't offer a way to wait for multiple producers to all report that they're "ready," nor
* does it offer a way to batch multiple messages from a single producer together.
* * It doesn't support backpressure and other features needed for resilience.
* * It doesn't provide much control of threading.
* * It doesn't offer much monitoring.
* * It doesn't propagate exceptions, so apps don't have a way to react to them.
* * It doesn't interoperate well with RxJava, coroutines, and other more commonly used
* alternatives.
* * It imposes requirements on the lifecycle of its subscribers. For example, if an message
* occurs between when one subscriber is removed and the next subscriber is added, the message
* is dropped.
* * Its performance is suboptimal, especially under Android.
* * It [doesn't support parameterized
* types](https://github.com/google/guava/issues/1431).
* * With the introduction of lambdas in Java 8, MessageBus went from less verbose than listeners
* to [more verbose](https://github.com/google/guava/issues/3311).
*
*
* <h2>MessageBus Summary</h2>
*
*
* The MessageBus allows publish-subscribe-style communication between components without requiring
* the components to explicitly register with one another (and thus be aware of each other). It is
* designed exclusively to replace traditional Java in-process message distribution using explicit
* registration. It is *not* a general-purpose publish-subscribe system, nor is it intended
* for interprocess communication.
*
* <h2>Receiving Messages</h2>
*
*
* To receive messages, an object should:
*
*
* 1. Expose a public method, known as the *message subscriber*, which accepts a single
* argument of the type of message desired;
* 1. Mark it with a [Subscribe] annotation;
* 1. Pass itself to an MessageBus instance's [.register] method.
*
*
* <h2>Posting Messages</h2>
*
*
* To post an message, simply provide the message object to the [.post] method. The
* MessageBus instance will determine the type of message and route it to all registered listeners.
*
*
* Messages are routed based on their type an message will be delivered to any subscriber for
* any type to which the message is *assignable.* This includes implemented interfaces, all
* superclasses, and all interfaces implemented by superclasses.
*
*
* When `post` is called, all registered subscribers for an message are run in sequence, so
* subscribers should be reasonably quick. If an message may trigger an extended process (such as a
* database load), spawn a thread or queue it for later. (For a convenient way to do this, use an
* [AsyncBus].)
*
* <h2>Subscriber Methods</h2>
*
*
* Message subscriber methods must accept only one argument: the message.
*
*
* Subscribers should not, in general, throw. If they do, the MessageBus will catch and log the
* exception. This is rarely the right solution for error handling and should not be relied upon; it
* is intended solely to help find problems during development.
*
*
* The MessageBus guarantees that it will not call a subscriber method from multiple threads
* simultaneously, unless the method explicitly allows it by bearing the [ ] annotation. If this annotation is not present, subscriber methods need not
* worry about being reentrant, unless also called from outside the MessageBus.
*
* <h2>Dead Messages</h2>
*
*
* If an message is posted, but no registered subscribers can accept it, it is considered "dead."
* To give the system a second chance to handle dead messages, they are wrapped in an instance of
* [DeadMessage] and reposted.
*
*
* If a subscriber for a supertype of all messages (such as Object) is registered, no message will
* ever be considered dead, and no DeadMessages will be generated. Accordingly, while DeadMessage
* extends [Object], a subscriber registered to receive any Object will never receive a
* DeadMessage.
*
*
* This class is safe for concurrent use.
*
*
* See the Guava User Guide article on [`MessageBus`](https://github.com/google/guava/wiki/MessageBusExplained).
*
* @author Cliff
* @since 10.0
* @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java
* @param executor the default executor this event bus uses for dispatching events to subscribers.
* @param dispatcher message dispatcher.
* @param exceptionHandler Handler for subscriber exceptions.
*/
open class Bus(
override val identifier: String,
override val executor: Executor,
val dispatcher: Dispatcher<Any>,
val exceptionHandler: SubscriberExceptionHandler<Any>,
) : IBus<Any> {
private val subscribers: SubscriberRegistry<Any> = SubscriberRegistry(this)
/**
* Creates a new MessageBus with the given `identifier`.
*
* @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java
* identifier.
*/
@JvmOverloads
constructor(identifier: String = "default") : this(
identifier,
Executor { it.run() },
Dispatcher.perThreadDispatchQueue(),
LoggingHandler()
)
/**
* Creates a new MessageBus with the given [SubscriberExceptionHandler].
*
* @param exceptionHandler Handler for subscriber exceptions.
* @since 16.0
*/
constructor(exceptionHandler: SubscriberExceptionHandler<Any>) : this(
"default",
Executor { it.run() },
Dispatcher.perThreadDispatchQueue(),
exceptionHandler
)
/**
* Registers all subscriber methods on `object` to receive messages.
*
* @param subscriber object whose subscriber methods should be registered.
*/
override fun register(subscriber: Any) {
subscribers.register(subscriber)
}
override fun register(subscriber: Any, method: Method) {
subscribers.register(subscriber, method)
}
/**
* Unregisters all subscriber methods on a registered `object`.
*
* @param subscriber object whose subscriber methods should be unregistered.
* @throws IllegalArgumentException if the object was not previously registered.
*/
override fun unregister(subscriber: Any) {
subscribers.unregister(subscriber)
}
/**
* Posts an message to all registered subscribers. This method will return successfully after the
* message has been posted to all subscribers, and regardless of any exceptions thrown by
* subscribers.
*
*
* If no subscribers have been subscribed for `message`'s class, and `message` is not
* already a [DeadMessage], it will be wrapped in a DeadMessage and reposted.
*
* @param message message to post.
*/
override fun publish(message: Any) {
val messageSubscribers = subscribers.getSubscribers(message)
if (messageSubscribers.hasNext()) {
dispatcher.dispatch(message, messageSubscribers)
} else if (message !is DeadMessage) {
// the message had no subscribers and was not itself a DeadMessage
publish(DeadMessage(this, message))
}
}
/** Handles the given exception thrown by a subscriber with the given context. */
override fun handleException(cause: Throwable?, context: SubscriberExceptionContext<Any>) {
try {
exceptionHandler.handleException(cause, context)
} catch (e2: Throwable) {
// if the handler threw an exception... well, just log it
logger.log(
Level.SEVERE, String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, cause),
e2
)
}
}
override fun toString(): String {
return "Bus(identifier='$identifier', executor=$executor, dispatcher=$dispatcher, exceptionHandler=$exceptionHandler, subscribers=$subscribers)"
}
companion object {
private val logger = Logger.getLogger(Bus::class.java.name)
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
/**
* Wraps an message that was posted, but which had no subscribers and thus could not be delivered.
*
*
* Registering a DeadMessage subscriber is useful for debugging or logging, as it can detect
* misconfigurations in a system's message distribution.
*
* @author Cliff
* @since 10.0
*
* Creates a new DeadMessage.
*
* @param source object broadcasting the DeadMessage (generally the [Bus]).
* @param message the message that could not be delivered.
*/
class DeadMessage(val source: Any?, val message: Any?) {
override fun toString(): String {
return "DeadMessage(source=$source, message=$message)"
}
}

View File

@@ -0,0 +1,151 @@
/*
* Copyright (C) 2014 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
/**
* Handler for dispatching messages to subscribers, providing different message ordering guarantees that
* make sense for different situations.
*
*
* **Note:** The dispatcher is orthogonal to the subscriber's `Executor`. The dispatcher
* controls the order in which messages are dispatched, while the executor controls how (i.e. on which
* thread) the subscriber is actually called when an message is dispatched to it.
*
* @author Colin Decker
*/
abstract class Dispatcher<T : Any> {
/** Dispatches the given `message` to the given `subscribers`. */
abstract fun dispatch(message: T, subscribers: Iterator<Subscriber<T>>?)
/** Implementation of a [.perThreadDispatchQueue] dispatcher. */
private class PerThreadQueuedDispatcher<T : Any> : Dispatcher<T>() {
// This dispatcher matches the original dispatch behavior of MessageBus.
/** Per-thread queue of messages to dispatch. */
private val queue: ThreadLocal<Queue<Message<T>>> = object : ThreadLocal<Queue<Message<T>>>() {
override fun initialValue(): Queue<Message<T>> {
return ArrayDeque()
}
}
/** Per-thread dispatch state, used to avoid reentrant message dispatching. */
private val dispatching: ThreadLocal<Boolean> = object : ThreadLocal<Boolean>() {
override fun initialValue(): Boolean {
return false
}
}
override fun dispatch(message: T, subscribers: Iterator<Subscriber<T>>?) {
val queueForThread = queue.get()
queueForThread.offer(Message(message, subscribers))
if (!dispatching.get()) {
dispatching.set(true)
try {
var nextMessage: Message<T>?
while (queueForThread.poll().also { nextMessage = it } != null) {
while (nextMessage!!.subscribers!!.hasNext()) {
nextMessage!!.subscribers!!.next().dispatchMessage(nextMessage!!.message)
}
}
} finally {
dispatching.remove()
queue.remove()
}
}
}
private class Message<T : Any>(val message: Any, val subscribers: Iterator<Subscriber<T>>?)
}
/** Implementation of a [.legacyAsync] dispatcher. */
private class LegacyAsyncDispatcher<T : Any> : Dispatcher<T>() {
// This dispatcher matches the original dispatch behavior of AsyncMessageBus.
//
// We can't really make any guarantees about the overall dispatch order for this dispatcher in
// a multithreaded environment for a couple reasons:
//
// 1. Subscribers to messages posted on different threads can be interleaved with each other
// freely. (A message on one thread, B message on another could yield any of
// [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
// 2. It's possible for subscribers to actually be dispatched to in a different order than they
// were added to the queue. It's easily possible for one thread to take the head of the
// queue, immediately followed by another thread taking the next element in the queue. That
// second thread can then dispatch to the subscriber it took before the first thread does.
//
// All this makes me really wonder if there's any value in queueing here at all. A dispatcher
// that simply loops through the subscribers and dispatches the message to each would actually
// probably provide a stronger order guarantee, though that order would obviously be different
// in some cases.
/** Global message queue. */
private val queue = ConcurrentLinkedQueue<MessageWithSubscriber<T>>()
override fun dispatch(message: T, subscribers: Iterator<Subscriber<T>>?) {
while (subscribers!!.hasNext()) {
queue.add(MessageWithSubscriber(message, subscribers.next()))
}
var e: MessageWithSubscriber<T>?
while (queue.poll().also { e = it } != null) {
e!!.subscriber!!.dispatchMessage(e!!.message)
}
}
private class MessageWithSubscriber<T : Any>(val message: T, val subscriber: Subscriber<T>?)
}
/** Implementation of [.immediate]. */
private class ImmediateDispatcher<T : Any> : Dispatcher<T>() {
override fun dispatch(message: T, subscribers: Iterator<Subscriber<T>>?) {
while (subscribers!!.hasNext()) {
subscribers.next().dispatchMessage(message)
}
}
}
companion object {
/**
* Returns a dispatcher that queues messages that are posted reentrantly on a thread that is already
* dispatching an message, guaranteeing that all messages posted on a single thread are dispatched to
* all subscribers in the order they are posted.
*
*
* When all subscribers are dispatched to using a *direct* executor (which dispatches on
* the same thread that posts the message), this yields a breadth-first dispatch order on each
* thread. That is, all subscribers to a single message A will be called before any subscribers to
* any messages B and C that are posted to the message bus by the subscribers to A.
*/
fun <T : Any> perThreadDispatchQueue(): Dispatcher<T> {
return PerThreadQueuedDispatcher()
}
/**
* Returns a dispatcher that queues messages that are posted in a single global queue. This behavior
* matches the original behavior of AsyncMessageBus exactly, but is otherwise not especially useful.
* For async dispatch, an [immediate][.immediate] dispatcher should generally be
* preferable.
*/
fun <T : Any> legacyAsync(): Dispatcher<T> {
return LegacyAsyncDispatcher()
}
/**
* Returns a dispatcher that dispatches messages to subscribers immediately as they're posted
* without using an intermediate queue to change the dispatch order. This is effectively a
* depth-first dispatch order, vs. breadth-first when using a queue.
*/
fun <T : Any> immediate(): Dispatcher<T> {
return ImmediateDispatcher()
}
}
}

View File

@@ -0,0 +1,41 @@
package com.synebula.gaea.bus
import java.lang.reflect.Method
import java.util.concurrent.Executor
interface IBus<T : Any> {
val identifier: String
val executor: Executor
/**
* 注册事件Listener
* @param subscriber subscriber所在类
*/
fun register(subscriber: Any)
/**
* 取消注册事件Listener
* @param subscriber Listener所在类
*/
fun register(subscriber: Any, method: Method)
/**
* 取消注册事件Listener
* @param subscriber Listener所在类
*/
fun unregister(subscriber: Any)
/**
* 同步发布事件
* @param message 事件
*/
fun publish(message: T)
/**
* 异常处理
*/
fun handleException(cause: Throwable?, context: SubscriberExceptionContext<T>)
}

View File

@@ -1,3 +0,0 @@
package com.synebula.gaea.bus
interface IMessage

View File

@@ -1,28 +0,0 @@
package com.synebula.gaea.bus
interface IMessageBus {
/**
* 注册事件Listener
* @param obj Listener所在类
*/
fun register(obj: Any)
/**
* 取消注册事件Listener
* @param obj Listener所在类
*/
fun unregister(obj: Any)
/**
* 同步发布事件
* @param message 事件
*/
fun publish(message: IMessage)
/**
* 异步发布事件
* @param message 事件
*/
fun publishAsync(message: IMessage)
}

View File

@@ -0,0 +1,34 @@
package com.synebula.gaea.bus
import java.util.logging.Level
import java.util.logging.Logger
/** Simple logging handler for subscriber exceptions. */
internal class LoggingHandler<T : Any> : SubscriberExceptionHandler<T> {
override fun handleException(exception: Throwable?, context: SubscriberExceptionContext<T>) {
val logger = logger(context)
if (logger.isLoggable(Level.SEVERE)) {
logger.log(Level.SEVERE, message(context), exception)
}
}
companion object {
private fun <T : Any> logger(context: SubscriberExceptionContext<T>): Logger {
return Logger.getLogger(Bus::class.java.name + "." + context.bus.identifier)
}
private fun <T : Any> message(context: SubscriberExceptionContext<T>): String {
val method = context.subscriberMethod
return ("Exception thrown by subscriber method "
+ method.name
+ '('
+ method.parameterTypes[0].name
+ ')'
+ " on subscriber "
+ context.subscriber
+ " when dispatching message: "
+ context.message)
}
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
/**
* Marks a method as a message subscriber.
*
*
* The type of message will be indicated by the method's first (and only) parameter, which cannot
* be primitive. If this annotation is applied to methods with zero parameters, or more than one
* parameter, the object containing the method will not be able to register for message delivery from
* the [Bus].
*
*
* Unless also annotated with @[AllowConcurrentSubscribe], message subscriber methods will be
* invoked serially by each message bus that they are registered with.
*
* @author Cliff
* @since 10.0
*
* @param topics method subscribe topics, only for [com.synebula.gaea.bus.messagebus.MessageBus]
*/
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER)
annotation class Subscribe(val topics: Array<String> = [])

View File

@@ -0,0 +1,128 @@
/*
* Copyright (C) 2014 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.concurrent.Executor
/**
* A subscriber method on a specific object, plus the executor that should be used for dispatching
* messages to it.
*
*
* Two subscribers are equivalent when they refer to the same method on the same object (not
* class). This property is used to ensure that no subscriber method is registered more than once.
*
* @author Colin Decker
*
* @param bus The message bus this subscriber belongs to.
* @param target The object with the subscriber method.
* @param method Subscriber method.
*/
open class Subscriber<T : Any> private constructor(
private val bus: IBus<T>,
val target: Any,
val method: Method,
) {
/** Executor to use for dispatching messages to this subscriber. */
private val executor: Executor?
init {
method.isAccessible = true
executor = bus.executor
}
/** Dispatches `message` to this subscriber using the proper executor. */
fun dispatchMessage(message: Any) {
executor!!.execute {
try {
invokeSubscriberMethod(message)
} catch (e: InvocationTargetException) {
bus.handleException(e.cause, context(message))
}
}
}
/**
* Invokes the subscriber method. This method can be overridden to make the invocation
* synchronized.
*/
@Throws(InvocationTargetException::class)
open fun invokeSubscriberMethod(message: Any) {
try {
method.invoke(target, message)
} catch (e: IllegalArgumentException) {
throw Error("Method rejected target/argument: $message", e)
} catch (e: IllegalAccessException) {
throw Error("Method became inaccessible: $message", e)
} catch (e: InvocationTargetException) {
if (e.cause is Error) {
throw (e.cause as Error?)!!
}
throw e
}
}
/** Gets the context for the given message. */
private fun context(message: Any): SubscriberExceptionContext<T> {
return SubscriberExceptionContext(bus, message, target, method)
}
override fun hashCode(): Int {
return (31 + method.hashCode()) * 31 + System.identityHashCode(target)
}
override fun equals(other: Any?): Boolean {
if (other is Subscriber<*>) {
// Use == so that different equal instances will still receive messages.
// We only guard against the case that the same object is registered
// multiple times
return target === other.target && method == other.method
}
return false
}
/**
* Subscriber that synchronizes invocations of a method to ensure that only one thread may enter
* the method at a time.
*/
internal class SynchronizedSubscriber<T : Any>(bus: IBus<T>, target: Any, method: Method) :
Subscriber<T>(bus, target, method) {
@Throws(InvocationTargetException::class)
override fun invokeSubscriberMethod(message: Any) {
synchronized(this) { super.invokeSubscriberMethod(message) }
}
}
companion object {
/** Creates a `Subscriber` for `method` on `subscriber`. */
fun <T : Any> create(bus: IBus<T>, subscriber: Any, method: Method): Subscriber<T> {
return if (isDeclaredThreadSafe(method)) Subscriber(
bus,
subscriber,
method
) else SynchronizedSubscriber(
bus, subscriber, method
)
}
/**
* Checks whether `method` is thread-safe, as indicated by the presence of the [ ] annotation.
*/
private fun isDeclaredThreadSafe(method: Method): Boolean {
return method.getAnnotation(AllowConcurrentSubscribe::class.java) != null
}
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright (C) 2013 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
import java.lang.reflect.Method
/**
* Context for an exception thrown by a subscriber.
*
* @since 16.0
*
* @param bus The [IBus] that handled the message and the subscriber. Useful for
* broadcasting a new message based on the error.
* @param message The message object that caused the subscriber to throw.
* @param subscriber The source subscriber context.
* @param subscriberMethod the subscribed method.
*/
class SubscriberExceptionContext<T : Any> internal constructor(
val bus: IBus<T>, val message: Any, val subscriber: Any, val subscriberMethod: Method,
)

View File

@@ -0,0 +1,24 @@
/*
* Copyright (C) 2013 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
/**
* Handler for exceptions thrown by message subscribers.
*
* @since 16.0
*/
interface SubscriberExceptionHandler<T : Any> {
/** Handles exceptions thrown by subscribers. */
fun handleException(exception: Throwable?, context: SubscriberExceptionContext<T>)
}

View File

@@ -0,0 +1,193 @@
/*
* Copyright (C) 2014 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
import com.synebula.gaea.reflect.Types
import java.lang.reflect.Method
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArraySet
/**
* Registry of subscribers to a single message bus.
*
* @author Colin Decker
*/
open class SubscriberRegistry<T : Any>(private val bus: IBus<T>) {
/**
* All registered subscribers, indexed by message type.
*
*
* The [CopyOnWriteArraySet] values make it easy and relatively lightweight to get an
* immutable snapshot of all current subscribers to a message without any locking.
*/
private val subscribers = ConcurrentHashMap<Class<*>, CopyOnWriteArraySet<Subscriber<T>>>()
/** Registers all subscriber methods on the given subscriber object. */
open fun register(subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
for ((eventType, messageMethodsInListener) in listenerMethods) {
var messageSubscribers = subscribers[eventType]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(eventType, newSet) ?: newSet
}
messageSubscribers.addAll(messageMethodsInListener)
}
}
/** Registers subscriber method on the given subscriber object. */
open fun register(subscriber: Any, method: Method) {
val parameterTypes = method.parameterTypes
val eventType = parameterTypes[0]
check(parameterTypes.size == 1) {
"Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter."
}
check(!parameterTypes[0].isPrimitive) {
"@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. "
}
var messageSubscribers = subscribers[eventType]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(eventType, newSet) ?: newSet
}
messageSubscribers.add(Subscriber.create(bus, subscriber, method))
}
/** Unregisters all subscribers on the given subscriber object. */
open fun unregister(subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
for ((eventType, listenerMethodsForType) in listenerMethods) {
val currentSubscribers = subscribers[eventType]
require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType.toSet()))) {
// if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on subscriber for that message type were... after
// all, the definition of subscribers on a particular class is totally static
"missing message subscriber for an annotated method. Is $subscriber registered?"
}
// don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0
}
}
/**
* Gets an iterator representing an immutable snapshot of all subscribers to the given message at
* the time this method is called.
*/
fun getSubscribers(eventType: Any): Iterator<Subscriber<T>> {
val eventSubscribers: CopyOnWriteArraySet<Subscriber<T>> =
subscribers[eventType.javaClass] ?: CopyOnWriteArraySet()
return eventSubscribers.iterator()
}
/**
* Returns all subscribers for the given subscriber grouped by the type of message they subscribe to.
*/
private fun findAllSubscribers(subscriber: Any): Map<Class<*>, List<Subscriber<T>>> {
val methodsInListener = mutableMapOf<Class<*>, List<Subscriber<T>>>()
val clazz: Class<*> = subscriber.javaClass
for (method in getAnnotatedMethods(clazz)) {
val parameterTypes = method.parameterTypes
val eventType = parameterTypes[0]
val methods = methodsInListener[eventType]?.toMutableList() ?: mutableListOf()
methods.add(Subscriber.create(bus, subscriber, method))
methodsInListener[eventType] = methods
}
return methodsInListener.toMap()
}
class MethodIdentifier(method: Method) {
private val name: String = method.name
private val parameterTypes: List<Class<*>> = listOf(*method.parameterTypes)
override fun hashCode(): Int {
var result = name.hashCode()
for (element in parameterTypes) result = 31 * result + element.hashCode()
return result
}
override fun equals(other: Any?): Boolean {
if (other is MethodIdentifier) {
return name == other.name && parameterTypes == other.parameterTypes
}
return false
}
}
/**
* A thread-safe cache that contains the mapping from each class to all methods in that class and
* all super-classes, that are annotated with `@Subscribe`. The cache is shared across all
* instances of this class; this greatly improves performance if multiple MessageBus instances are
* created and objects of the same class are registered on all of them.
*/
protected val subscriberMethodsCache = mapOf<Class<*>, List<Method>>()
@Synchronized
protected fun getAnnotatedMethods(clazz: Class<*>): List<Method> {
var methods = subscriberMethodsCache[clazz]
if (methods == null)
methods = getAnnotatedMethodsNotCached(clazz, Subscribe::class.java)
return methods
}
protected fun getAnnotatedMethodsNotCached(
clazz: Class<*>,
annotationClass: Class<out Annotation>,
): List<Method> {
val supertypes = flattenHierarchy(clazz)
val identifiers = mutableMapOf<MethodIdentifier, Method>()
for (supertype in supertypes) {
for (method in supertype.declaredMethods) {
if (method.isAnnotationPresent(annotationClass) && !method.isSynthetic) {
val parameterTypes = method.parameterTypes
check(parameterTypes.size == 1) {
"Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter."
}
check(!parameterTypes[0].isPrimitive) {
"@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. "
}
val identifier = MethodIdentifier(method)
if (!identifiers.containsKey(identifier)) {
identifiers[identifier] = method
}
}
}
}
return identifiers.values.toList()
}
companion object {
/** Global cache of classes to their flattened hierarchy of supertypes. */
protected val flattenHierarchyCache = mutableMapOf<Class<*>, Set<Class<*>>>()
/**
* Flattens a class's type hierarchy into a set of `Class` objects including all
* superclasses (transitively) and all interfaces implemented by these superclasses.
*/
fun flattenHierarchy(clazz: Class<*>): Set<Class<*>> {
var supertypes = flattenHierarchyCache[clazz]
if (supertypes == null) {
supertypes = Types.supertypes(clazz)
flattenHierarchyCache[clazz] = supertypes
}
return supertypes
}
}
}

View File

@@ -0,0 +1,71 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus.messagebus
import com.synebula.gaea.bus.Dispatcher
import com.synebula.gaea.bus.LoggingHandler
import com.synebula.gaea.bus.SubscriberExceptionHandler
import java.util.concurrent.Executor
/**
* An [MessageBus] that takes the Executor of your choice and uses it to dispatch events,
* allowing dispatch to occur asynchronously.
*
* @author Cliff
* @since 10.0
*/
class AsyncMessageBus<T : Any> : MessageBus<T> {
/**
* Creates a new AsyncEventBus that will use `executor` to dispatch events. Assigns `identifier` as the bus's name for logging purposes.
*
* @param identifier short name for the bus, for logging purposes.
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
*/
constructor(identifier: String, executor: Executor) : super(
identifier,
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
/**
* Creates a new AsyncEventBus that will use `executor` to dispatch events.
*
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
* @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
* See [SubscriberExceptionHandler] for more information.
* @since 16.0
*/
constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler<T>) : super(
"default",
executor,
Dispatcher.legacyAsync(),
subscriberExceptionHandler
)
/**
* Creates a new AsyncEventBus that will use `executor` to dispatch events.
*
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
*/
constructor(executor: Executor) : super(
"default",
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
}

View File

@@ -0,0 +1,39 @@
package com.synebula.gaea.bus.messagebus
import com.synebula.gaea.bus.IBus
import java.lang.reflect.Method
interface IMessageBus<T : Any> : IBus<T> {
/**
* 注册事件Listener
* @param topics 主题
* @param subscriber subscriber对象
*/
fun register(topics: Array<String>, subscriber: Any)
/**
* 注册事件Listener
* @param topics 主题
* @param subscriber subscriber对象
* @param method Listener方法
*/
fun register(topics: Array<String>, subscriber: Any, method: Method)
/**
* 取消注册事件Listener
* @param topic 主题
* @param subscriber subscriber对象
*/
fun unregister(topic: String, subscriber: Any)
/**
* 同步发布事件
* @param topic 主题
* @param message 事件
*/
fun publish(topic: String, message: T)
}

View File

@@ -0,0 +1,280 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus.messagebus
import com.synebula.gaea.bus.*
import java.lang.reflect.Method
import java.util.*
import java.util.concurrent.Executor
import java.util.logging.Level
import java.util.logging.Logger
/**
* Dispatches events to listeners, and provides ways for listeners to register themselves.
*
* <h2>Avoid EventBus</h2>
*
*
* **We recommend against using EventBus.** It was designed many years ago, and newer
* libraries offer better ways to decouple components and react to events.
*
*
* To decouple components, we recommend a dependency-injection framework. For Android code, most
* apps use [Dagger](https://dagger.dev). For server code, common options include [Guice](https://github.com/google/guice/wiki/Motivation) and [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-introduction).
* Frameworks typically offer a way to register multiple listeners independently and then request
* them together as a set ([Dagger](https://dagger.dev/dev-guide/multibindings), [Guice](https://github.com/google/guice/wiki/Multibindings), [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-autowired-annotation)).
*
*
* To react to events, we recommend a reactive-streams framework like [RxJava](https://github.com/ReactiveX/RxJava/wiki) (supplemented with its [RxAndroid](https://github.com/ReactiveX/RxAndroid) extension if you are building for
* Android) or [Project Reactor](https://projectreactor.io/). (For the basics of
* translating code from using an event bus to using a reactive-streams framework, see these two
* guides: [1](https://blog.jkl.gg/implementing-an-event-bus-with-rxjava-rxbus/), [2](https://lorentzos.com/rxjava-as-event-bus-the-right-way-10a36bdd49ba).) Some usages
* of EventBus may be better written using [Kotlin coroutines](https://kotlinlang.org/docs/coroutines-guide.html), including [Flow](https://kotlinlang.org/docs/flow.html) and [Channels](https://kotlinlang.org/docs/channels.html). Yet other usages are better served
* by individual libraries that provide specialized support for particular use cases.
*
*
* Disadvantages of EventBus include:
*
*
* * It makes the cross-references between producer and subscriber harder to find. This can
* complicate debugging, lead to unintentional reentrant calls, and force apps to eagerly
* initialize all possible subscribers at startup time.
* * It uses reflection in ways that break when code is processed by optimizers/minimizer like
* [R8 and Proguard](https://developer.android.com/studio/build/shrink-code).
* * It doesn't offer a way to wait for multiple events before taking action. For example, it
* doesn't offer a way to wait for multiple producers to all report that they're "ready," nor
* does it offer a way to batch multiple events from a single producer together.
* * It doesn't support backpressure and other features needed for resilience.
* * It doesn't provide much control of threading.
* * It doesn't offer much monitoring.
* * It doesn't propagate exceptions, so apps don't have a way to react to them.
* * It doesn't interoperate well with RxJava, coroutines, and other more commonly used
* alternatives.
* * It imposes requirements on the lifecycle of its subscribers. For example, if an event
* occurs between when one subscriber is removed and the next subscriber is added, the event
* is dropped.
* * Its performance is suboptimal, especially under Android.
* * It [doesn't support parameterized
* types](https://github.com/google/guava/issues/1431).
* * With the introduction of lambdas in Java 8, EventBus went from less verbose than listeners
* to [more verbose](https://github.com/google/guava/issues/3311).
*
*
* <h2>EventBus Summary</h2>
*
*
* The EventBus allows publish-subscribe-style communication between components without requiring
* the components to explicitly register with one another (and thus be aware of each other). It is
* designed exclusively to replace traditional Java in-process event distribution using explicit
* registration. It is *not* a general-purpose publish-subscribe system, nor is it intended
* for interprocess communication.
*
* <h2>Receiving Events</h2>
*
*
* To receive events, an object should:
*
*
* 1. Expose a public method, known as the *event subscriber*, which accepts a single
* argument of the type of event desired;
* 1. Mark it with a [Subscribe] annotation;
* 1. Pass itself to an EventBus instance's [.register] method.
*
*
* <h2>Posting Events</h2>
*
*
* To post an event, simply provide the event object to the [.post] method. The
* EventBus instance will determine the type of event and route it to all registered listeners.
*
*
* Events are routed based on their type an event will be delivered to any subscriber for
* any type to which the event is *assignable.* This includes implemented interfaces, all
* superclasses, and all interfaces implemented by superclasses.
*
*
* When `post` is called, all registered subscribers for an event are run in sequence, so
* subscribers should be reasonably quick. If an event may trigger an extended process (such as a
* database load), spawn a thread or queue it for later. (For a convenient way to do this, use an
* [AsyncMessageBus].)
*
* <h2>Subscriber Methods</h2>
*
*
* Event subscriber methods must accept only one argument: the event.
*
*
* Subscribers should not, in general, throw. If they do, the EventBus will catch and log the
* exception. This is rarely the right solution for error handling and should not be relied upon; it
* is intended solely to help find problems during development.
*
*
* The EventBus guarantees that it will not call a subscriber method from multiple threads
* simultaneously, unless the method explicitly allows it by bearing the [ ] annotation. If this annotation is not present, subscriber methods need not
* worry about being reentrant, unless also called from outside the EventBus.
*
* <h2>Dead Events</h2>
*
*
* If an event is posted, but no registered subscribers can accept it, it is considered "dead."
* To give the system a second chance to handle dead events, they are wrapped in an instance of
* [DeadMessage] and reposted.
*
*
* If a subscriber for a supertype of all events (such as Object) is registered, no event will
* ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadMessage
* extends [Object], a subscriber registered to receive any Object will never receive a
* DeadMessage.
*
*
* This class is safe for concurrent use.
*
*
* See the Guava User Guide article on [`EventBus`](https://github.com/google/guava/wiki/EventBusExplained).
*
* @author Cliff
* @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java
* @param executor the default executor this event bus uses for dispatching events to subscribers.
* @param dispatcher message dispatcher.
* @param exceptionHandler Handler for subscriber exceptions.
*/
open class MessageBus<T : Any> internal constructor(
override val identifier: String,
override val executor: Executor,
val dispatcher: Dispatcher<T>,
val exceptionHandler: SubscriberExceptionHandler<T>,
) : IMessageBus<T> {
val DEAD_TOPIC = "DEAD_TOPIC"
val subscribers = TopicSubscriberRegistry(this)
/**
* Creates a new EventBus with the given `identifier`.
*
* @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java
* identifier.
*/
@JvmOverloads
constructor(identifier: String = "default") : this(
identifier,
Executor { it.run() },
Dispatcher.perThreadDispatchQueue(),
LoggingHandler()
)
/**
* Creates a new EventBus with the given [SubscriberExceptionHandler].
*
* @param exceptionHandler Handler for subscriber exceptions.
* @since 16.0
*/
constructor(exceptionHandler: SubscriberExceptionHandler<T>) : this(
"default",
Executor { it.run() },
Dispatcher.perThreadDispatchQueue(),
exceptionHandler
)
override fun register(topics: Array<String>, subscriber: Any) {
subscribers.register(topics, subscriber)
}
/**
* Registers subscriber method on `object` to receive messages.
*
* @param topics method subscribe topic.
* @param subscriber subscriber method declare object.
* @param method subscriber method should be registered.
*/
override fun register(topics: Array<String>, subscriber: Any, method: Method) {
subscribers.register(topics, subscriber, method)
}
/**
* Registers all subscriber methods on `object` to receive messages.
*
* @param subscriber object whose subscriber methods should be registered.
*/
override fun register(subscriber: Any) {
subscribers.register(subscriber)
}
override fun register(subscriber: Any, method: Method) {
subscribers.register(subscriber, method)
}
/**
* Unregisters all subscriber methods on a registered `object`.
*
* @param subscriber object whose subscriber methods should be unregistered.
* @throws IllegalArgumentException if the object was not previously registered.
*/
override fun unregister(subscriber: Any) {
subscribers.unregister(subscriber)
}
override fun unregister(topic: String, subscriber: Any) {
subscribers.unregister(topic, subscriber)
}
/**
* Posts an message to all registered subscribers. This method will return successfully after the
* message has been posted to all subscribers, and regardless of any exceptions thrown by
* subscribers.
*
* @param message message to post.
*/
override fun publish(message: T) {
val messageSubscribers = subscribers.getSubscribers(message.javaClass.name)
if (messageSubscribers.hasNext()) {
dispatcher.dispatch(message, messageSubscribers)
} else {
// the message had no subscribers and was not itself a DeadMessage
publish(DEAD_TOPIC, message)
}
}
override fun publish(topic: String, message: T) {
val messageSubscribers = subscribers.getSubscribers(topic)
if (messageSubscribers.hasNext()) {
dispatcher.dispatch(message, messageSubscribers)
} else if (topic != DEAD_TOPIC) {
// the message had no subscribers and was not itself a DeadMessage
publish(DEAD_TOPIC, message)
}
}
/** Handles the given exception thrown by a subscriber with the given context. */
override fun handleException(cause: Throwable?, context: SubscriberExceptionContext<T>) {
try {
exceptionHandler.handleException(cause, context)
} catch (e2: Throwable) {
// if the handler threw an exception... well, just log it
logger.log(
Level.SEVERE, String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, cause),
e2
)
}
}
override fun toString(): String {
return "MessageBus(identifier='$identifier', executor=$executor, dispatcher=$dispatcher, exceptionHandler=$exceptionHandler, subscribers=$subscribers)"
}
companion object {
private val logger = Logger.getLogger(Bus::class.java.name)
}
}

View File

@@ -0,0 +1,184 @@
/*
* Copyright (C) 2014 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus.messagebus
import com.synebula.gaea.bus.Subscribe
import com.synebula.gaea.bus.Subscriber
import com.synebula.gaea.bus.SubscriberRegistry
import java.lang.reflect.Method
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArraySet
/**
* Registry of subscribers to a single message bus.
*
* @author Colin Decker
*/
open class TopicSubscriberRegistry<T : Any>(private val bus: IMessageBus<T>) : SubscriberRegistry<T>(bus) {
/**
* All registered subscribers, indexed by message type.
*
*
* The [CopyOnWriteArraySet] values make it easy and relatively lightweight to get an
* immutable snapshot of all current subscribers to an message without any locking.
*/
private val subscribers = ConcurrentHashMap<String, CopyOnWriteArraySet<Subscriber<T>>>()
open fun register(topics: Array<String>, subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
for (topic in topics) {
for ((_, messageMethodsInListener) in listenerMethods) {
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.addAll(messageMethodsInListener)
}
}
}
open fun register(topics: Array<String>, subscriber: Any, method: Method) {
for (topic in topics) {
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.add(Subscriber.create(this.bus, subscriber, method))
}
}
/** Registers all subscriber methods on the given subscriber object. */
override fun register(subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
for ((topic, messageMethodsInListener) in listenerMethods) {
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.addAll(messageMethodsInListener)
}
}
/** Registers subscriber method on the given subscriber object. */
override fun register(subscriber: Any, method: Method) {
val parameterTypes = method.parameterTypes
check(parameterTypes.size == 1) {
"Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter."
}
check(!parameterTypes[0].isPrimitive) {
"@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. "
}
val eventType = parameterTypes[0]
val topic = eventType.name
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.add(Subscriber.create(bus, subscriber, method))
}
/** Unregisters all subscribers on the given subscriber object. */
override fun unregister(subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
for ((topic, listenerMethodsForType) in listenerMethods) {
val currentSubscribers = subscribers[topic]
require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType.toSet()))) {
// if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on subscriber for that message type were... after
// all, the definition of subscribers on a particular class is totally static
"missing message subscriber for an annotated method. Is $subscriber registered?"
}
// don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0
}
}
/** Unregisters all subscribers by topic on the given subscriber object. */
open fun unregister(topic: String, subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
if (listenerMethods[topic] == null) return //不包含topic则推出
val currentSubscribers = subscribers[topic]
require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethods[topic]!!.toSet()))) {
// if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on subscriber for that message type were... after
// all, the definition of subscribers on a particular class is totally static
"missing message subscriber for an annotated method. Is $subscriber registered?"
}
// don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0
}
/**
* Gets an iterator representing an immutable snapshot of all subscribers to the given message at
* the time this method is called.
*/
fun getSubscribers(topic: String): Iterator<Subscriber<T>> {
val topicSubscribers: CopyOnWriteArraySet<Subscriber<T>> = subscribers[topic] ?: CopyOnWriteArraySet()
return topicSubscribers.iterator()
}
/**
* Returns all subscribers for the given subscriber grouped by the type of message they subscribe to.
*/
private fun findAllSubscribers(subscriber: Any): Map<String, List<Subscriber<T>>> {
val methodsInListener = mutableMapOf<String, MutableList<Subscriber<T>>>()
val clazz: Class<*> = subscriber.javaClass
for (method in getAnnotatedMethods(clazz)) {
var topics = method.getAnnotation(Subscribe::class.java).topics
//如果没有定义topic则使用消息类名称做topic
if (topics.isEmpty()) {
val parameterTypes = method.parameterTypes
val messageType = parameterTypes[0]
topics = arrayOf(messageType.name)
}
for (topic in topics) {
val listeners = methodsInListener.getOrDefault(topic, mutableListOf())
listeners.add(Subscriber.create(bus, subscriber, method))
methodsInListener[topic] = listeners
}
}
return methodsInListener
}
class MethodIdentifier(method: Method) {
private val name: String = method.name
private val parameterTypes: List<Class<*>> = listOf(*method.parameterTypes)
override fun hashCode(): Int {
var result = name.hashCode()
for (element in parameterTypes) result = 31 * result + element.hashCode()
return result
}
override fun equals(other: Any?): Boolean {
if (other is MethodIdentifier) {
return name == other.name && parameterTypes == other.parameterTypes
}
return false
}
}
}

View File

@@ -0,0 +1,94 @@
package com.synebula.gaea.bus
import com.synebula.gaea.bus.messagebus.AsyncMessageBus
import com.synebula.gaea.bus.messagebus.IMessageBus
import com.synebula.gaea.bus.messagebus.MessageBus
import org.junit.Test
import java.util.concurrent.Executors
class BusTest {
@Test
fun testBus() {
val bus: IBus<Any> = Bus()
val subscriber = TestSubscriber()
bus.register(subscriber)
bus.publish("Hello world")
bus.publish(subscriber)
bus.publish(1)
}
@Test
fun testAsyncBus() {
val bus: IBus<Any> = AsyncBus(Executors.newFixedThreadPool(10))
val subscriber = TestSubscriber()
bus.register(subscriber, subscriber.javaClass.declaredMethods[0])
bus.register(subscriber, subscriber.javaClass.declaredMethods[1])
bus.publish("Hello world")
bus.publish(subscriber)
}
@Test
fun testMessageBus() {
val bus: IMessageBus<Any> = MessageBus()
val subscriber = TestTopicSubscriber()
bus.register(subscriber)
bus.publish("hello", "Hello world")
bus.publish("whoami", subscriber)
}
@Test
fun testMessageBus2() {
val bus: IMessageBus<Any> = AsyncMessageBus(Executors.newFixedThreadPool(10))
val subscriber = TestTopicSubscriber()
val subscriber2 = TestTopicSubscriber2()
bus.register(subscriber)
bus.register(subscriber2)
bus.publish("hello", "Hello world")
bus.publish("whoami", subscriber)
bus.unregister(subscriber2)
bus.publish("hello", "Hello world")
bus.publish("whoami", subscriber)
}
internal class TestSubscriber {
@Subscribe
fun echo(name: String) {
println(name)
}
@Subscribe
fun whoAmI(testSubscriber: TestSubscriber) {
println(testSubscriber.javaClass.name)
}
}
internal class TestTopicSubscriber {
@Subscribe(topics = ["hello"])
fun echo(name: String) {
println(name)
}
@Subscribe(topics = ["whoami"])
fun whoAmI(testSubscriber: TestTopicSubscriber) {
println(testSubscriber.javaClass.name)
}
}
internal class TestTopicSubscriber2 {
@Subscribe(topics = ["hello"])
fun echo(name: String) {
println("2 $name")
}
@Subscribe(topics = ["whoami"])
fun whoAmI(testSubscriber: TestTopicSubscriber) {
println("2 ${testSubscriber.javaClass.name}")
}
}
}