1.3.0 简化bus使用,增加remove前后事件,简化auto config注册方式

This commit is contained in:
2022-08-23 15:47:22 +08:00
parent 53709b6fc3
commit 58402a6400
28 changed files with 391 additions and 823 deletions

View File

@@ -17,7 +17,7 @@ buildscript {
subprojects { subprojects {
group 'com.synebula' group 'com.synebula'
version '1.2.0' version '1.3.0'
buildscript { buildscript {
repositories { repositories {

View File

@@ -7,9 +7,8 @@ import org.springframework.beans.factory.BeanFactory
class ServiceFactory( class ServiceFactory(
supertype: Class<*>, supertype: Class<*>,
var beanFactory: BeanFactory, var beanFactory: BeanFactory,
var implementBeanNames: Array<String> = arrayOf()
) : Factory(supertype) { ) : Factory(supertype) {
override fun createProxy(): Proxy { override fun createProxy(): Proxy {
return ServiceProxy(supertype, this.beanFactory, this.implementBeanNames) return ServiceProxy(supertype, this.beanFactory)
} }
} }

View File

@@ -1,49 +1,57 @@
package com.synebula.gaea.app.autoconfig.service package com.synebula.gaea.app.autoconfig.service
import com.synebula.gaea.bus.IBus
import com.synebula.gaea.data.serialization.IObjectMapper import com.synebula.gaea.data.serialization.IObjectMapper
import com.synebula.gaea.domain.repository.IRepository import com.synebula.gaea.domain.repository.IRepository
import com.synebula.gaea.domain.repository.IRepositoryFactory import com.synebula.gaea.domain.repository.IRepositoryFactory
import com.synebula.gaea.domain.service.Domain import com.synebula.gaea.domain.service.Domain
import com.synebula.gaea.domain.service.IService import com.synebula.gaea.domain.service.IService
import com.synebula.gaea.domain.service.Service import com.synebula.gaea.domain.service.Service
import com.synebula.gaea.exception.NoticeUserException
import com.synebula.gaea.spring.autoconfig.Proxy import com.synebula.gaea.spring.autoconfig.Proxy
import org.springframework.beans.factory.BeanFactory import org.springframework.beans.factory.BeanFactory
import org.springframework.core.ResolvableType
import java.io.InvalidClassException import java.io.InvalidClassException
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method import java.lang.reflect.Method
class ServiceProxy( class ServiceProxy(
private var supertype: Class<*>, private var supertype: Class<*>,
private var beanFactory: BeanFactory, implementBeanNames: Array<String> = arrayOf() private var beanFactory: BeanFactory
) : Proxy() { ) : Proxy() {
private var service: IService<*> private var service: IService<*>
init { init {
// 如果没有实现类, 使用Service类代理 // 如果没有实现类, 使用Service类代理
if (implementBeanNames.isEmpty()) { // 如果没有实现类并且没有ServiceDependency注解, 则抛出异常
// 如果没有实现类并且没有ServiceDependency注解, 则抛出异常 if (!this.supertype.declaredAnnotations.any { it.annotationClass == Domain::class }) {
if (!this.supertype.declaredAnnotations.any { it.annotationClass == Domain::class }) { throw InvalidClassException(
throw InvalidClassException( "interface ${this.supertype.name} must has implementation class or annotation by ${Domain::class.qualifiedName}"
"interface ${this.supertype.name} must has implementation class or annotation by ${Domain::class.qualifiedName}"
)
}
val domain = this.supertype.getDeclaredAnnotation(Domain::class.java)
// repository工厂对象
val defaultRepositoryFactory = this.beanFactory.getBean(IRepositoryFactory::class.java)
val mapper = this.beanFactory.getBean(IObjectMapper::class.java)
val constructor = Service::class.java.getConstructor(
Class::class.java, IRepository::class.java, IObjectMapper::class.java
) )
this.service = }
constructor.newInstance( val domain = this.supertype.getDeclaredAnnotation(Domain::class.java)
domain.clazz.java,
defaultRepositoryFactory.createRawRepository(domain.clazz.java), // repository工厂对象
mapper val defaultRepositoryFactory = this.beanFactory.getBean(IRepositoryFactory::class.java)
) val mapper = this.beanFactory.getBean(IObjectMapper::class.java)
} else {
this.service = this.beanFactory.getBean(implementBeanNames[0]) as IService<*> val constructor = Service::class.java.getConstructor(
Class::class.java, IRepository::class.java, IObjectMapper::class.java
)
this.service =
constructor.newInstance(
domain.clazz.java,
defaultRepositoryFactory.createRawRepository(domain.clazz.java),
mapper
)
// 尝试注入IBus对象
val bus = Service::class.java.getDeclaredField("bus")
val iBusObjectProvider = this.beanFactory.getBeanProvider<IBus<*>>(ResolvableType.forField(bus))
iBusObjectProvider.ifAvailable { busBean ->
bus.isAccessible = true
bus.set(this.service, busBean)
} }
} }
@@ -61,6 +69,11 @@ class ServiceProxy(
return proxyMethod.invoke(this.service, *args) return proxyMethod.invoke(this.service, *args)
} catch (ex: NoSuchMethodException) { } catch (ex: NoSuchMethodException) {
throw NoSuchMethodException("method [${method.toGenericString()}] not implements in class [${this.service::class.java}], you must implements interface [${this.supertype.name}] ") throw NoSuchMethodException("method [${method.toGenericString()}] not implements in class [${this.service::class.java}], you must implements interface [${this.supertype.name}] ")
} catch (ex: InvocationTargetException) {
if (ex.cause is Error || ex.cause is NoticeUserException) {
throw ex.targetException!!
}
throw ex
} }
} }
} }

View File

@@ -30,20 +30,20 @@ class ServiceRegister : Register() {
// 尝试获取实际继承类型 // 尝试获取实际继承类型
val implBeanDefinitions = this.doScan(basePackages, arrayOf(this.interfaceFilter(arrayOf(beanClazz)))) val implBeanDefinitions = this.doScan(basePackages, arrayOf(this.interfaceFilter(arrayOf(beanClazz))))
implBeanDefinitions.forEach { if (implBeanDefinitions.isNotEmpty()) {
it.isAutowireCandidate = false implBeanDefinitions.forEach {
result[it.beanClassName!!] = it result[it.beanClassName!!] = it
}
} else {
// 构造BeanDefinition
val builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz)
builder.addConstructorArgValue(beanClazz)
builder.addConstructorArgValue(this._beanFactory)
val definition = builder.rawBeanDefinition as GenericBeanDefinition
definition.beanClass = ServiceFactory::class.java
definition.autowireMode = GenericBeanDefinition.AUTOWIRE_BY_TYPE
result[beanClazz.name] = definition
} }
// 构造BeanDefinition
val builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz)
builder.addConstructorArgValue(beanClazz)
builder.addConstructorArgValue(this._beanFactory)
builder.addConstructorArgValue(implBeanDefinitions.map { it.beanClassName })
val definition = builder.rawBeanDefinition as GenericBeanDefinition
definition.beanClass = ServiceFactory::class.java
definition.autowireMode = GenericBeanDefinition.AUTOWIRE_BY_TYPE
result[beanClazz.name] = definition
} }
return result return result
} }

View File

@@ -4,5 +4,5 @@ import com.synebula.gaea.bus.Bus
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
@Component @Component
class EventBus : Bus() class EventBus : Bus<Any>()

View File

@@ -14,7 +14,7 @@ class EventBusSubscriberProcessor : BeanPostProcessor {
// 事件总线bean由Spring IoC容器负责创建这里只需要通过@Autowired注解注入该bean即可使用事件总线 // 事件总线bean由Spring IoC容器负责创建这里只需要通过@Autowired注解注入该bean即可使用事件总线
@Autowired @Autowired
var messageBus: IBus<Any>? = null var bus: IBus<Any>? = null
@Throws(BeansException::class) @Throws(BeansException::class)
override fun postProcessBeforeInitialization(bean: Any, beanName: String): Any { override fun postProcessBeforeInitialization(bean: Any, beanName: String): Any {
@@ -34,12 +34,15 @@ class EventBusSubscriberProcessor : BeanPostProcessor {
if (annotation.annotationClass == Subscribe::class) { if (annotation.annotationClass == Subscribe::class) {
// 如果这是一个Guava @Subscribe注解的事件监听器方法说明所在bean实例 // 如果这是一个Guava @Subscribe注解的事件监听器方法说明所在bean实例
// 对应一个Guava事件监听器类将该bean实例注册到Guava事件总线 // 对应一个Guava事件监听器类将该bean实例注册到Guava事件总线
messageBus?.register(bean) val subscribe = annotation as Subscribe
if (subscribe.topics.isEmpty())
bus?.register(bean, method)
else
bus?.register(subscribe.topics, bean, method)
return bean return bean
} }
} }
} }
return bean return bean
} }
} }

View File

@@ -7,9 +7,8 @@ import org.springframework.beans.factory.BeanFactory
class MongodbRepoFactory( class MongodbRepoFactory(
supertype: Class<*>, supertype: Class<*>,
var beanFactory: BeanFactory, var beanFactory: BeanFactory,
var implementBeanNames: Array<String> = arrayOf()
) : Factory(supertype) { ) : Factory(supertype) {
override fun createProxy(): Proxy { override fun createProxy(): Proxy {
return MongodbRepoProxy(supertype, this.beanFactory, this.implementBeanNames) return MongodbRepoProxy(supertype, this.beanFactory)
} }
} }

View File

@@ -11,7 +11,7 @@ import org.springframework.data.mongodb.core.MongoTemplate
import java.lang.reflect.Method import java.lang.reflect.Method
class MongodbRepoProxy( class MongodbRepoProxy(
private var supertype: Class<*>, private var beanFactory: BeanFactory, implementBeanNames: Array<String> = arrayOf() private var supertype: Class<*>, private var beanFactory: BeanFactory
) : Proxy() { ) : Proxy() {
private var mongodbRepo: Any private var mongodbRepo: Any
@@ -28,15 +28,11 @@ class MongodbRepoProxy(
interfaceClazz = IQuery::class.java interfaceClazz = IQuery::class.java
} }
if (implementBeanNames.isEmpty()) { val constructor = clazz.getConstructor(Class::class.java, MongoTemplate::class.java)
val constructor = clazz.getConstructor(Class::class.java, MongoTemplate::class.java) this.mongodbRepo = constructor.newInstance(
this.mongodbRepo = constructor.newInstance( this.supertype.getGenericInterface(interfaceClazz)!!.actualTypeArguments[0],
this.supertype.getGenericInterface(interfaceClazz)!!.actualTypeArguments[0], this.beanFactory.getBean(MongoTemplate::class.java)
this.beanFactory.getBean(MongoTemplate::class.java) )
)
} else {
this.mongodbRepo = this.beanFactory.getBean(implementBeanNames[0])
}
} }
/** /**

View File

@@ -34,20 +34,20 @@ class MongodbRepoRegister : Register() {
// 尝试获取实际继承类型 // 尝试获取实际继承类型
val implBeanDefinitions = this.doScan(basePackages, arrayOf(this.interfaceFilter(arrayOf(beanClazz)))) val implBeanDefinitions = this.doScan(basePackages, arrayOf(this.interfaceFilter(arrayOf(beanClazz))))
implBeanDefinitions.forEach {
it.isAutowireCandidate = false
result[it.beanClassName!!] = it
}
// 构造BeanDefinition if (implBeanDefinitions.isNotEmpty()) {
val builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz) implBeanDefinitions.forEach {
builder.addConstructorArgValue(beanClazz) result[it.beanClassName!!] = it
builder.addConstructorArgValue(this._beanFactory) }
builder.addConstructorArgValue(implBeanDefinitions.map { it.beanClassName }) } else { // 构造BeanDefinition
val definition = builder.rawBeanDefinition as GenericBeanDefinition val builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz)
definition.beanClass = MongodbRepoFactory::class.java builder.addConstructorArgValue(beanClazz)
definition.autowireMode = GenericBeanDefinition.AUTOWIRE_BY_TYPE builder.addConstructorArgValue(this._beanFactory)
result[beanClazz.name] = definition val definition = builder.rawBeanDefinition as GenericBeanDefinition
definition.beanClass = MongodbRepoFactory::class.java
definition.autowireMode = GenericBeanDefinition.AUTOWIRE_BY_TYPE
result[beanClazz.name] = definition
}
} }
return result return result
} }

View File

@@ -40,7 +40,7 @@ open class MongodbRepository<TAggregateRoot : IAggregateRoot<ID>, ID>(
this.repo.save(list) this.repo.save(list)
} }
override fun <TAggregateRoot> count(params: Map<String, Any>?): Int { override fun count(params: Map<String, Any>?): Int {
val query = Query() val query = Query()
return this.repo.count(query.where(params, clazz), clazz).toInt() return this.repo.count(query.where(params, clazz), clazz).toInt()
} }

View File

@@ -61,13 +61,13 @@ abstract class AppAspect {
point.proceed() point.proceed()
} catch (ex: Throwable) { } catch (ex: Throwable) {
//找到类的模块名称,否则使用类名 //找到类的模块名称,否则使用类名
var moduleName = clazz.name var moduleName = clazz.simpleName
val module = clazz.annotations.find { it is Module } val module = clazz.annotations.find { it is Module }
if (module != null && module is Module) { if (module != null && module is Module) {
moduleName = module.name moduleName = module.name
} }
var message = "$moduleName - $funcName 异常" var message = "$moduleName - $funcName 异常"
message = if (ex is NoticeUserException) { message = if (ex is NoticeUserException || ex is Error) {
"$message: ${ex.message}" "$message: ${ex.message}"
} else { } else {
"$message" "$message"

View File

@@ -14,7 +14,7 @@
package com.synebula.gaea.bus package com.synebula.gaea.bus
/** /**
* Marks an message subscriber method as being thread-safe. This annotation indicates that MessageBus * Marks a message subscriber method as being thread-safe. This annotation indicates that MessageBus
* may invoke the message subscriber simultaneously from multiple threads. * may invoke the message subscriber simultaneously from multiple threads.
* *
* *

View File

@@ -1,68 +0,0 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
import java.util.concurrent.Executor
/**
* An [Bus] that takes the Executor of your choice and uses it to dispatch messages,
* allowing dispatch to occur asynchronously.
*
* @author Cliff
* @since 10.0
*/
class AsyncBus : Bus {
/**
* Creates a new AsyncMessageBus that will use `executor` to dispatch messages. Assigns `identifier` as the bus's name for logging purposes.
*
* @param identifier short name for the bus, for logging purposes.
* @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut
* down the executor after the last message has been posted to this message bus.
*/
constructor(identifier: String, executor: Executor) : super(
identifier,
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
/**
* Creates a new AsyncMessageBus that will use `executor` to dispatch messages.
*
* @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut
* down the executor after the last message has been posted to this message bus.
* @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
* See [SubscriberExceptionHandler] for more information.
* @since 16.0
*/
constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler<Any>) : super(
"default",
executor,
Dispatcher.legacyAsync(),
subscriberExceptionHandler
)
/**
* Creates a new AsyncMessageBus that will use `executor` to dispatch messages.
*
* @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut
* down the executor after the last message has been posted to this message bus.
*/
constructor(executor: Executor) : super(
"default",
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
}

View File

@@ -37,7 +37,7 @@ import java.util.logging.Logger
* *
* To react to messages, we recommend a reactive-streams framework like [RxJava](https://github.com/ReactiveX/RxJava/wiki) (supplemented with its [RxAndroid](https://github.com/ReactiveX/RxAndroid) extension if you are building for * To react to messages, we recommend a reactive-streams framework like [RxJava](https://github.com/ReactiveX/RxJava/wiki) (supplemented with its [RxAndroid](https://github.com/ReactiveX/RxAndroid) extension if you are building for
* Android) or [Project Reactor](https://projectreactor.io/). (For the basics of * Android) or [Project Reactor](https://projectreactor.io/). (For the basics of
* translating code from using an message bus to using a reactive-streams framework, see these two * translating code from using a message bus to using a reactive-streams framework, see these two
* guides: [1](https://blog.jkl.gg/implementing-an-message-bus-with-rxjava-rxbus/), [2](https://lorentzos.com/rxjava-as-message-bus-the-right-way-10a36bdd49ba).) Some usages * guides: [1](https://blog.jkl.gg/implementing-an-message-bus-with-rxjava-rxbus/), [2](https://lorentzos.com/rxjava-as-message-bus-the-right-way-10a36bdd49ba).) Some usages
* of MessageBus may be better written using [Kotlin coroutines](https://kotlinlang.org/docs/coroutines-guide.html), including [Flow](https://kotlinlang.org/docs/flow.html) and [Channels](https://kotlinlang.org/docs/channels.html). Yet other usages are better served * of MessageBus may be better written using [Kotlin coroutines](https://kotlinlang.org/docs/coroutines-guide.html), including [Flow](https://kotlinlang.org/docs/flow.html) and [Channels](https://kotlinlang.org/docs/channels.html). Yet other usages are better served
* by individual libraries that provide specialized support for particular use cases. * by individual libraries that provide specialized support for particular use cases.
@@ -60,12 +60,10 @@ import java.util.logging.Logger
* * It doesn't propagate exceptions, so apps don't have a way to react to them. * * It doesn't propagate exceptions, so apps don't have a way to react to them.
* * It doesn't interoperate well with RxJava, coroutines, and other more commonly used * * It doesn't interoperate well with RxJava, coroutines, and other more commonly used
* alternatives. * alternatives.
* * It imposes requirements on the lifecycle of its subscribers. For example, if an message * * It imposes requirements on the lifecycle of its subscribers. For example, if a message
* occurs between when one subscriber is removed and the next subscriber is added, the message * occurs between when one subscriber is removed and the next subscriber is added, the message
* is dropped. * is dropped.
* * Its performance is suboptimal, especially under Android. * * Its performance is suboptimal, especially under Android.
* * It [doesn't support parameterized
* types](https://github.com/google/guava/issues/1431).
* * With the introduction of lambdas in Java 8, MessageBus went from less verbose than listeners * * With the introduction of lambdas in Java 8, MessageBus went from less verbose than listeners
* to [more verbose](https://github.com/google/guava/issues/3311). * to [more verbose](https://github.com/google/guava/issues/3311).
* *
@@ -94,19 +92,15 @@ import java.util.logging.Logger
* <h2>Posting Messages</h2> * <h2>Posting Messages</h2>
* *
* *
* To post an message, simply provide the message object to the [.post] method. The * To post a message, simply provide the message object to the [.post] method. The
* MessageBus instance will determine the type of message and route it to all registered listeners. * MessageBus instance will determine the type of message and route it to all registered listeners.
* *
* *
* Messages are routed based on their type an message will be delivered to any subscriber for * Messages are routed based on their type a message will be delivered to any subscriber for
* any type to which the message is *assignable.* This includes implemented interfaces, all * any type to which the message is *assignable.* This includes implemented interfaces, all
* superclasses, and all interfaces implemented by superclasses. * thisclasses, and all interfaces implemented by thisclasses.
* *
* *
* When `post` is called, all registered subscribers for an message are run in sequence, so
* subscribers should be reasonably quick. If an message may trigger an extended process (such as a
* database load), spawn a thread or queue it for later. (For a convenient way to do this, use an
* [AsyncBus].)
* *
* <h2>Subscriber Methods</h2> * <h2>Subscriber Methods</h2>
* *
@@ -126,12 +120,12 @@ import java.util.logging.Logger
* <h2>Dead Messages</h2> * <h2>Dead Messages</h2>
* *
* *
* If an message is posted, but no registered subscribers can accept it, it is considered "dead." * If a message is posted, but no registered subscribers can accept it, it is considered "dead."
* To give the system a second chance to handle dead messages, they are wrapped in an instance of * To give the system a second chance to handle dead messages, they are wrapped in an instance of
* [DeadMessage] and reposted. * [DeadMessage] and reposted.
* *
* *
* If a subscriber for a supertype of all messages (such as Object) is registered, no message will * If a subscriber for this type of all messages (such as Object) is registered, no message will
* ever be considered dead, and no DeadMessages will be generated. Accordingly, while DeadMessage * ever be considered dead, and no DeadMessages will be generated. Accordingly, while DeadMessage
* extends [Object], a subscriber registered to receive any Object will never receive a * extends [Object], a subscriber registered to receive any Object will never receive a
* DeadMessage. * DeadMessage.
@@ -140,8 +134,6 @@ import java.util.logging.Logger
* This class is safe for concurrent use. * This class is safe for concurrent use.
* *
* *
* See the Guava User Guide article on [`MessageBus`](https://github.com/google/guava/wiki/MessageBusExplained).
*
* @author Cliff * @author Cliff
* @since 10.0 * @since 10.0
* @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java * @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java
@@ -149,14 +141,16 @@ import java.util.logging.Logger
* @param dispatcher message dispatcher. * @param dispatcher message dispatcher.
* @param exceptionHandler Handler for subscriber exceptions. * @param exceptionHandler Handler for subscriber exceptions.
*/ */
open class Bus( open class Bus<T : Any>(
override val identifier: String, override val identifier: String,
override val executor: Executor, override val executor: Executor,
val dispatcher: Dispatcher<Any>, val dispatcher: Dispatcher<T>,
val exceptionHandler: SubscriberExceptionHandler<Any>, val exceptionHandler: SubscriberExceptionHandler<T>,
) : IBus<Any> { ) : IBus<T> {
private val subscribers: SubscriberRegistry<Any> = SubscriberRegistry(this) val DEAD_TOPIC = "DEAD_TOPIC"
private val subscribers: SubscriberRegistry<T> = SubscriberRegistry(this)
/** /**
* Creates a new MessageBus with the given `identifier`. * Creates a new MessageBus with the given `identifier`.
@@ -178,18 +172,76 @@ open class Bus(
* @param exceptionHandler Handler for subscriber exceptions. * @param exceptionHandler Handler for subscriber exceptions.
* @since 16.0 * @since 16.0
*/ */
constructor(exceptionHandler: SubscriberExceptionHandler<Any>) : this( constructor(exceptionHandler: SubscriberExceptionHandler<T>) : this(
"default", "default",
Executor { it.run() }, Executor { it.run() },
Dispatcher.perThreadDispatchQueue(), Dispatcher.perThreadDispatchQueue(),
exceptionHandler exceptionHandler
) )
/**
* Creates a new AsyncMessageBus that will use `executor` to dispatch messages. Assigns `identifier` as the bus's name for logging purposes.
*
* @param identifier short name for the bus, for logging purposes.
* @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut
* down the executor after the last message has been posted to this message bus.
*/
constructor(identifier: String, executor: Executor) : this(
identifier,
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
/**
* Creates a new AsyncMessageBus that will use `executor` to dispatch messages.
*
* @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut
* down the executor after the last message has been posted to this message bus.
* @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
* See [SubscriberExceptionHandler] for more information.
* @since 16.0
*/
constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler<T>) : this(
"default",
executor,
Dispatcher.legacyAsync(),
subscriberExceptionHandler
)
/**
* Creates a new AsyncMessageBus that will use `executor` to dispatch messages.
*
* @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut
* down the executor after the last message has been posted to this message bus.
*/
constructor(executor: Executor) : this(
"default",
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
override fun register(topics: Array<String>, subscriber: Any) {
subscribers.register(topics, subscriber)
}
/**
* Registers subscriber method on `object` to receive messages.
*
* @param topics method subscribe topic.
* @param subscriber subscriber method declare object.
* @param method subscriber method should be registered.
*/
override fun register(topics: Array<String>, subscriber: Any, method: Method) {
subscribers.register(topics, subscriber, method)
}
/** /**
* Registers all subscriber methods on `object` to receive messages. * Registers all subscriber methods on `object` to receive messages.
* *
* @param subscriber object whose subscriber methods should be registered. * @param subscriber object whose subscriber methods should be registered.
*/ */
override fun register(subscriber: Any) { override fun register(subscriber: Any) {
subscribers.register(subscriber) subscribers.register(subscriber)
@@ -202,37 +254,73 @@ open class Bus(
/** /**
* Unregisters all subscriber methods on a registered `object`. * Unregisters all subscriber methods on a registered `object`.
* *
* @param subscriber object whose subscriber methods should be unregistered. * @param subscriber object whose subscriber methods should be unregistered.
* @throws IllegalArgumentException if the object was not previously registered. * @throws IllegalArgumentException if the object was not previously registered.
*/ */
override fun unregister(subscriber: Any) { override fun unregister(subscriber: Any) {
subscribers.unregister(subscriber) subscribers.unregister(subscriber)
} }
override fun unregister(topic: String, subscriber: Any) {
subscribers.unregister(topic, subscriber)
}
/** /**
* Posts an message to all registered subscribers. This method will return successfully after the * Posts a message to all registered subscribers. This method will return successfully after the
* message has been posted to all subscribers, and regardless of any exceptions thrown by * message has been posted to all subscribers, and regardless of any exceptions thrown by
* subscribers. * subscribers.
* *
*
* If no subscribers have been subscribed for `message`'s class, and `message` is not
* already a [DeadMessage], it will be wrapped in a DeadMessage and reposted.
*
* @param message message to post. * @param message message to post.
*/ */
override fun publish(message: Any) { override fun publish(message: T) {
val messageSubscribers = subscribers.getSubscribers(message) val messageSubscribers = subscribers.getSubscribers(message::class.java.name)
if (messageSubscribers.hasNext()) { if (messageSubscribers.hasNext()) {
dispatcher.dispatch(message, messageSubscribers) dispatcher.dispatch(message, messageSubscribers)
} else if (message !is DeadMessage) { } else {
// the message had no subscribers and was not itself a DeadMessage // the message had no subscribers and was not itself a DeadMessage
publish(DeadMessage(this, message)) publish(DEAD_TOPIC, message)
} }
} }
/**
* Posts a message to all registered subscribers. This method will return successfully after the
* message has been posted to all subscribers, and regardless of any exceptions thrown by
* subscribers.
*
* @param message message to post.
*/
override fun publishAsync(message: T) {
val messageSubscribers = subscribers.getSubscribers(message::class.java.name)
if (messageSubscribers.hasNext()) {
dispatcher.dispatchAsync(message, messageSubscribers)
} else {
// the message had no subscribers and was not itself a DeadMessage
publishAsync(DEAD_TOPIC, message)
}
}
override fun publish(topic: String, message: T) {
val messageSubscribers = subscribers.getSubscribers(topic)
if (messageSubscribers.hasNext()) {
dispatcher.dispatch(message, messageSubscribers)
} else if (topic != DEAD_TOPIC) {
// the message had no subscribers and was not itself a DeadMessage
publish(DEAD_TOPIC, message)
}
}
override fun publishAsync(topic: String, message: T) {
val messageSubscribers = subscribers.getSubscribers(topic)
if (messageSubscribers.hasNext()) {
dispatcher.dispatchAsync(message, messageSubscribers)
} else if (topic != DEAD_TOPIC) {
// the message had no subscribers and was not itself a DeadMessage
publishAsync(DEAD_TOPIC, message)
}
}
/** Handles the given exception thrown by a subscriber with the given context. */ /** Handles the given exception thrown by a subscriber with the given context. */
override fun handleException(cause: Throwable?, context: SubscriberExceptionContext<Any>) { override fun handleException(cause: Throwable?, context: SubscriberExceptionContext<T>) {
try { try {
exceptionHandler.handleException(cause, context) exceptionHandler.handleException(cause, context)
} catch (e2: Throwable) { } catch (e2: Throwable) {

View File

@@ -14,7 +14,7 @@
package com.synebula.gaea.bus package com.synebula.gaea.bus
/** /**
* Wraps an message that was posted, but which had no subscribers and thus could not be delivered. * Wraps a message that was posted, but which had no subscribers and thus could not be delivered.
* *
* *
* Registering a DeadMessage subscriber is useful for debugging or logging, as it can detect * Registering a DeadMessage subscriber is useful for debugging or logging, as it can detect

View File

@@ -23,13 +23,20 @@ import java.util.concurrent.ConcurrentLinkedQueue
* *
* **Note:** The dispatcher is orthogonal to the subscriber's `Executor`. The dispatcher * **Note:** The dispatcher is orthogonal to the subscriber's `Executor`. The dispatcher
* controls the order in which messages are dispatched, while the executor controls how (i.e. on which * controls the order in which messages are dispatched, while the executor controls how (i.e. on which
* thread) the subscriber is actually called when an message is dispatched to it. * thread) the subscriber is actually called when a message is dispatched to it.
* *
* @author Colin Decker * @author Colin Decker
*/ */
abstract class Dispatcher<T : Any> { abstract class Dispatcher<T : Any> {
/** Dispatches the given `message` to the given `subscribers`. */ /** Dispatches the given `message` to the given `subscribers`. */
abstract fun dispatch(message: T, subscribers: Iterator<Subscriber<T>>?) fun dispatch(message: T, subscribers: Iterator<Subscriber<T>>?) {
while (subscribers!!.hasNext()) {
subscribers.next().dispatch(message)
}
}
/** Dispatches the given `message` to the given `subscribers`. */
abstract fun dispatchAsync(message: T, subscribers: Iterator<Subscriber<T>>?)
/** Implementation of a [.perThreadDispatchQueue] dispatcher. */ /** Implementation of a [.perThreadDispatchQueue] dispatcher. */
private class PerThreadQueuedDispatcher<T : Any> : Dispatcher<T>() { private class PerThreadQueuedDispatcher<T : Any> : Dispatcher<T>() {
@@ -48,7 +55,7 @@ abstract class Dispatcher<T : Any> {
} }
} }
override fun dispatch(message: T, subscribers: Iterator<Subscriber<T>>?) { override fun dispatchAsync(message: T, subscribers: Iterator<Subscriber<T>>?) {
val queueForThread = queue.get() val queueForThread = queue.get()
queueForThread.offer(Message(message, subscribers)) queueForThread.offer(Message(message, subscribers))
if (!dispatching.get()) { if (!dispatching.get()) {
@@ -57,7 +64,7 @@ abstract class Dispatcher<T : Any> {
var nextMessage: Message<T>? var nextMessage: Message<T>?
while (queueForThread.poll().also { nextMessage = it } != null) { while (queueForThread.poll().also { nextMessage = it } != null) {
while (nextMessage!!.subscribers!!.hasNext()) { while (nextMessage!!.subscribers!!.hasNext()) {
nextMessage!!.subscribers!!.next().dispatchMessage(nextMessage!!.message) nextMessage!!.subscribers!!.next().dispatchAsync(nextMessage!!.message)
} }
} }
} finally { } finally {
@@ -91,32 +98,23 @@ abstract class Dispatcher<T : Any> {
// in some cases. // in some cases.
/** Global message queue. */ /** Global message queue. */
private val queue = ConcurrentLinkedQueue<MessageWithSubscriber<T>>() private val queue = ConcurrentLinkedQueue<MessageWithSubscriber<T>>()
override fun dispatch(message: T, subscribers: Iterator<Subscriber<T>>?) { override fun dispatchAsync(message: T, subscribers: Iterator<Subscriber<T>>?) {
while (subscribers!!.hasNext()) { while (subscribers!!.hasNext()) {
queue.add(MessageWithSubscriber(message, subscribers.next())) queue.add(MessageWithSubscriber(message, subscribers.next()))
} }
var e: MessageWithSubscriber<T>? var e: MessageWithSubscriber<T>?
while (queue.poll().also { e = it } != null) { while (queue.poll().also { e = it } != null) {
e!!.subscriber!!.dispatchMessage(e!!.message) e!!.subscriber!!.dispatchAsync(e!!.message)
} }
} }
private class MessageWithSubscriber<T : Any>(val message: T, val subscriber: Subscriber<T>?) private class MessageWithSubscriber<T : Any>(val message: T, val subscriber: Subscriber<T>?)
} }
/** Implementation of [.immediate]. */
private class ImmediateDispatcher<T : Any> : Dispatcher<T>() {
override fun dispatch(message: T, subscribers: Iterator<Subscriber<T>>?) {
while (subscribers!!.hasNext()) {
subscribers.next().dispatchMessage(message)
}
}
}
companion object { companion object {
/** /**
* Returns a dispatcher that queues messages that are posted reentrantly on a thread that is already * Returns a dispatcher that queues messages that are posted reentrantly on a thread that is already
* dispatching an message, guaranteeing that all messages posted on a single thread are dispatched to * dispatching a message, guaranteeing that all messages posted on a single thread are dispatched to
* all subscribers in the order they are posted. * all subscribers in the order they are posted.
* *
* *
@@ -138,14 +136,5 @@ abstract class Dispatcher<T : Any> {
fun <T : Any> legacyAsync(): Dispatcher<T> { fun <T : Any> legacyAsync(): Dispatcher<T> {
return LegacyAsyncDispatcher() return LegacyAsyncDispatcher()
} }
/**
* Returns a dispatcher that dispatches messages to subscribers immediately as they're posted
* without using an intermediate queue to change the dispatch order. This is effectively a
* depth-first dispatch order, vs. breadth-first when using a queue.
*/
fun <T : Any> immediate(): Dispatcher<T> {
return ImmediateDispatcher()
}
} }
} }

View File

@@ -33,6 +33,49 @@ interface IBus<T : Any> {
*/ */
fun publish(message: T) fun publish(message: T)
/**
* 异步发布事件
* @param message 事件
*/
fun publishAsync(message: T)
/**
* 注册事件Listener
* @param topics 主题
* @param subscriber subscriber对象
*/
fun register(topics: Array<String>, subscriber: Any)
/**
* 注册事件Listener
* @param topics 主题
* @param subscriber subscriber对象
* @param method Listener方法
*/
fun register(topics: Array<String>, subscriber: Any, method: Method)
/**
* 取消注册事件Listener
* @param topic 主题
* @param subscriber subscriber对象
*/
fun unregister(topic: String, subscriber: Any)
/**
* 同步发布事件
* @param topic 主题
* @param message 事件
*/
fun publish(topic: String, message: T)
/**
* 异步发布事件
* @param topic 主题
* @param message 事件
*/
fun publishAsync(topic: String, message: T)
/** /**
* 异常处理 * 异常处理

View File

@@ -13,6 +13,7 @@
*/ */
package com.synebula.gaea.bus package com.synebula.gaea.bus
import com.synebula.gaea.exception.NoticeUserException
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method import java.lang.reflect.Method
import java.util.concurrent.Executor import java.util.concurrent.Executor
@@ -44,8 +45,13 @@ open class Subscriber<T : Any> private constructor(
executor = bus.executor executor = bus.executor
} }
/** Dispatches `message` to this subscriber . */
fun dispatch(message: Any) {
invokeSubscriberMethod(message)
}
/** Dispatches `message` to this subscriber using the proper executor. */ /** Dispatches `message` to this subscriber using the proper executor. */
fun dispatchMessage(message: Any) { fun dispatchAsync(message: Any) {
executor!!.execute { executor!!.execute {
try { try {
invokeSubscriberMethod(message) invokeSubscriberMethod(message)
@@ -68,8 +74,8 @@ open class Subscriber<T : Any> private constructor(
} catch (e: IllegalAccessException) { } catch (e: IllegalAccessException) {
throw Error("Method became inaccessible: $message", e) throw Error("Method became inaccessible: $message", e)
} catch (e: InvocationTargetException) { } catch (e: InvocationTargetException) {
if (e.cause is Error) { if (e.cause is Error || e.cause is NoticeUserException) {
throw (e.cause as Error?)!! throw e.cause!!
} }
throw e throw e
} }

View File

@@ -31,50 +31,75 @@ open class SubscriberRegistry<T : Any>(private val bus: IBus<T>) {
* The [CopyOnWriteArraySet] values make it easy and relatively lightweight to get an * The [CopyOnWriteArraySet] values make it easy and relatively lightweight to get an
* immutable snapshot of all current subscribers to a message without any locking. * immutable snapshot of all current subscribers to a message without any locking.
*/ */
private val subscribers = ConcurrentHashMap<Class<*>, CopyOnWriteArraySet<Subscriber<T>>>() private val subscribers = ConcurrentHashMap<String, CopyOnWriteArraySet<Subscriber<T>>>()
open fun register(topics: Array<String>, subscriber: Any) {
/** Registers all subscriber methods on the given subscriber object. */
open fun register(subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber) val listenerMethods = findAllSubscribers(subscriber)
for ((eventType, messageMethodsInListener) in listenerMethods) { for (topic in topics) {
var messageSubscribers = subscribers[eventType] for ((_, messageMethodsInListener) in listenerMethods) {
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.addAll(messageMethodsInListener)
}
}
}
open fun register(topics: Array<String>, subscriber: Any, method: Method) {
for (topic in topics) {
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) { if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>() val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(eventType, newSet) ?: newSet messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.add(Subscriber.create(this.bus, subscriber, method))
}
}
/** Registers all subscriber methods on the given subscriber object. */
fun register(subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
for ((topic, messageMethodsInListener) in listenerMethods) {
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
} }
messageSubscribers.addAll(messageMethodsInListener) messageSubscribers.addAll(messageMethodsInListener)
} }
} }
/** Registers subscriber method on the given subscriber object. */ /** Registers subscriber method on the given subscriber object. */
open fun register(subscriber: Any, method: Method) { fun register(subscriber: Any, method: Method) {
val parameterTypes = method.parameterTypes val parameterTypes = method.parameterTypes
val eventType = parameterTypes[0]
check(parameterTypes.size == 1) { check(parameterTypes.size == 1) {
"Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter." "Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter."
} }
check(!parameterTypes[0].isPrimitive) { check(!parameterTypes[0].isPrimitive) {
"@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. " "@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. "
} }
var messageSubscribers = subscribers[eventType]
val eventType = parameterTypes[0]
val topic = eventType.name
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) { if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>() val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(eventType, newSet) ?: newSet messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
} }
messageSubscribers.add(Subscriber.create(bus, subscriber, method)) messageSubscribers.add(Subscriber.create(bus, subscriber, method))
} }
/** Unregisters all subscribers on the given subscriber object. */
/** Unregisters all subscribers on the given subscriber object. */ fun unregister(subscriber: Any) {
open fun unregister(subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber) val listenerMethods = findAllSubscribers(subscriber)
for ((eventType, listenerMethodsForType) in listenerMethods) { for ((topic, listenerMethodsForType) in listenerMethods) {
val currentSubscribers = subscribers[eventType] val currentSubscribers = subscribers[topic]
require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType.toSet()))) { require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType.toSet()))) {
// if removeAll returns true, all we really know is that at least one subscriber was // if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one // removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on subscriber for that message type were... after // subscriber was removed, all subscribers on subscriber for that message type were... after
// all, the definition of subscribers on a particular class is totally static // all, the definition of subscribers on a particular class is totally static
"missing message subscriber for an annotated method. Is $subscriber registered?" "missing message subscriber for an annotated method. Is $subscriber registered?"
} }
@@ -84,30 +109,55 @@ open class SubscriberRegistry<T : Any>(private val bus: IBus<T>) {
} }
} }
/** Unregisters all subscribers by topic on the given subscriber object. */
open fun unregister(topic: String, subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
if (listenerMethods[topic] == null) return //不包含topic则推出
val currentSubscribers = subscribers[topic]
require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethods[topic]!!.toSet()))) {
// if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on subscriber for that message type were... after
// all, the definition of subscribers on a particular class is totally static
"missing message subscriber for an annotated method. Is $subscriber registered?"
}
// don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0
}
/** /**
* Gets an iterator representing an immutable snapshot of all subscribers to the given message at * Gets an iterator representing an immutable snapshot of all subscribers to the given message at
* the time this method is called. * the time this method is called.
*/ */
fun getSubscribers(eventType: Any): Iterator<Subscriber<T>> { fun getSubscribers(topic: String): Iterator<Subscriber<T>> {
val eventSubscribers: CopyOnWriteArraySet<Subscriber<T>> = val topicSubscribers: CopyOnWriteArraySet<Subscriber<T>> = subscribers[topic] ?: CopyOnWriteArraySet()
subscribers[eventType.javaClass] ?: CopyOnWriteArraySet() return topicSubscribers.iterator()
return eventSubscribers.iterator()
} }
/** /**
* Returns all subscribers for the given subscriber grouped by the type of message they subscribe to. * Returns all subscribers for the given subscriber grouped by the type of message they subscribe to.
*/ */
private fun findAllSubscribers(subscriber: Any): Map<Class<*>, List<Subscriber<T>>> { private fun findAllSubscribers(subscriber: Any): Map<String, List<Subscriber<T>>> {
val methodsInListener = mutableMapOf<Class<*>, List<Subscriber<T>>>() val methodsInListener = mutableMapOf<String, MutableList<Subscriber<T>>>()
val clazz: Class<*> = subscriber.javaClass val clazz: Class<*> = subscriber.javaClass
for (method in getAnnotatedMethods(clazz)) { for (method in getAnnotatedMethods(clazz)) {
val parameterTypes = method.parameterTypes var topics = method.getAnnotation(Subscribe::class.java).topics
val eventType = parameterTypes[0]
val methods = methodsInListener[eventType]?.toMutableList() ?: mutableListOf() //如果没有定义topic则使用消息类名称做topic
methods.add(Subscriber.create(bus, subscriber, method)) if (topics.isEmpty()) {
methodsInListener[eventType] = methods val parameterTypes = method.parameterTypes
val messageType = parameterTypes[0]
topics = arrayOf(messageType.name)
}
for (topic in topics) {
val listeners = methodsInListener.getOrDefault(topic, mutableListOf())
listeners.add(Subscriber.create(bus, subscriber, method))
methodsInListener[topic] = listeners
}
} }
return methodsInListener.toMap() return methodsInListener
} }

View File

@@ -1,71 +0,0 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus.messagebus
import com.synebula.gaea.bus.Dispatcher
import com.synebula.gaea.bus.LoggingHandler
import com.synebula.gaea.bus.SubscriberExceptionHandler
import java.util.concurrent.Executor
/**
* An [MessageBus] that takes the Executor of your choice and uses it to dispatch events,
* allowing dispatch to occur asynchronously.
*
* @author Cliff
* @since 10.0
*/
class AsyncMessageBus<T : Any> : MessageBus<T> {
/**
* Creates a new AsyncEventBus that will use `executor` to dispatch events. Assigns `identifier` as the bus's name for logging purposes.
*
* @param identifier short name for the bus, for logging purposes.
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
*/
constructor(identifier: String, executor: Executor) : super(
identifier,
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
/**
* Creates a new AsyncEventBus that will use `executor` to dispatch events.
*
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
* @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
* See [SubscriberExceptionHandler] for more information.
* @since 16.0
*/
constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler<T>) : super(
"default",
executor,
Dispatcher.legacyAsync(),
subscriberExceptionHandler
)
/**
* Creates a new AsyncEventBus that will use `executor` to dispatch events.
*
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
*/
constructor(executor: Executor) : super(
"default",
executor,
Dispatcher.legacyAsync(),
LoggingHandler()
)
}

View File

@@ -1,39 +0,0 @@
package com.synebula.gaea.bus.messagebus
import com.synebula.gaea.bus.IBus
import java.lang.reflect.Method
interface IMessageBus<T : Any> : IBus<T> {
/**
* 注册事件Listener
* @param topics 主题
* @param subscriber subscriber对象
*/
fun register(topics: Array<String>, subscriber: Any)
/**
* 注册事件Listener
* @param topics 主题
* @param subscriber subscriber对象
* @param method Listener方法
*/
fun register(topics: Array<String>, subscriber: Any, method: Method)
/**
* 取消注册事件Listener
* @param topic 主题
* @param subscriber subscriber对象
*/
fun unregister(topic: String, subscriber: Any)
/**
* 同步发布事件
* @param topic 主题
* @param message 事件
*/
fun publish(topic: String, message: T)
}

View File

@@ -1,280 +0,0 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus.messagebus
import com.synebula.gaea.bus.*
import java.lang.reflect.Method
import java.util.*
import java.util.concurrent.Executor
import java.util.logging.Level
import java.util.logging.Logger
/**
* Dispatches events to listeners, and provides ways for listeners to register themselves.
*
* <h2>Avoid EventBus</h2>
*
*
* **We recommend against using EventBus.** It was designed many years ago, and newer
* libraries offer better ways to decouple components and react to events.
*
*
* To decouple components, we recommend a dependency-injection framework. For Android code, most
* apps use [Dagger](https://dagger.dev). For server code, common options include [Guice](https://github.com/google/guice/wiki/Motivation) and [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-introduction).
* Frameworks typically offer a way to register multiple listeners independently and then request
* them together as a set ([Dagger](https://dagger.dev/dev-guide/multibindings), [Guice](https://github.com/google/guice/wiki/Multibindings), [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-autowired-annotation)).
*
*
* To react to events, we recommend a reactive-streams framework like [RxJava](https://github.com/ReactiveX/RxJava/wiki) (supplemented with its [RxAndroid](https://github.com/ReactiveX/RxAndroid) extension if you are building for
* Android) or [Project Reactor](https://projectreactor.io/). (For the basics of
* translating code from using an event bus to using a reactive-streams framework, see these two
* guides: [1](https://blog.jkl.gg/implementing-an-event-bus-with-rxjava-rxbus/), [2](https://lorentzos.com/rxjava-as-event-bus-the-right-way-10a36bdd49ba).) Some usages
* of EventBus may be better written using [Kotlin coroutines](https://kotlinlang.org/docs/coroutines-guide.html), including [Flow](https://kotlinlang.org/docs/flow.html) and [Channels](https://kotlinlang.org/docs/channels.html). Yet other usages are better served
* by individual libraries that provide specialized support for particular use cases.
*
*
* Disadvantages of EventBus include:
*
*
* * It makes the cross-references between producer and subscriber harder to find. This can
* complicate debugging, lead to unintentional reentrant calls, and force apps to eagerly
* initialize all possible subscribers at startup time.
* * It uses reflection in ways that break when code is processed by optimizers/minimizer like
* [R8 and Proguard](https://developer.android.com/studio/build/shrink-code).
* * It doesn't offer a way to wait for multiple events before taking action. For example, it
* doesn't offer a way to wait for multiple producers to all report that they're "ready," nor
* does it offer a way to batch multiple events from a single producer together.
* * It doesn't support backpressure and other features needed for resilience.
* * It doesn't provide much control of threading.
* * It doesn't offer much monitoring.
* * It doesn't propagate exceptions, so apps don't have a way to react to them.
* * It doesn't interoperate well with RxJava, coroutines, and other more commonly used
* alternatives.
* * It imposes requirements on the lifecycle of its subscribers. For example, if an event
* occurs between when one subscriber is removed and the next subscriber is added, the event
* is dropped.
* * Its performance is suboptimal, especially under Android.
* * It [doesn't support parameterized
* types](https://github.com/google/guava/issues/1431).
* * With the introduction of lambdas in Java 8, EventBus went from less verbose than listeners
* to [more verbose](https://github.com/google/guava/issues/3311).
*
*
* <h2>EventBus Summary</h2>
*
*
* The EventBus allows publish-subscribe-style communication between components without requiring
* the components to explicitly register with one another (and thus be aware of each other). It is
* designed exclusively to replace traditional Java in-process event distribution using explicit
* registration. It is *not* a general-purpose publish-subscribe system, nor is it intended
* for interprocess communication.
*
* <h2>Receiving Events</h2>
*
*
* To receive events, an object should:
*
*
* 1. Expose a public method, known as the *event subscriber*, which accepts a single
* argument of the type of event desired;
* 1. Mark it with a [Subscribe] annotation;
* 1. Pass itself to an EventBus instance's [.register] method.
*
*
* <h2>Posting Events</h2>
*
*
* To post an event, simply provide the event object to the [.post] method. The
* EventBus instance will determine the type of event and route it to all registered listeners.
*
*
* Events are routed based on their type an event will be delivered to any subscriber for
* any type to which the event is *assignable.* This includes implemented interfaces, all
* superclasses, and all interfaces implemented by superclasses.
*
*
* When `post` is called, all registered subscribers for an event are run in sequence, so
* subscribers should be reasonably quick. If an event may trigger an extended process (such as a
* database load), spawn a thread or queue it for later. (For a convenient way to do this, use an
* [AsyncMessageBus].)
*
* <h2>Subscriber Methods</h2>
*
*
* Event subscriber methods must accept only one argument: the event.
*
*
* Subscribers should not, in general, throw. If they do, the EventBus will catch and log the
* exception. This is rarely the right solution for error handling and should not be relied upon; it
* is intended solely to help find problems during development.
*
*
* The EventBus guarantees that it will not call a subscriber method from multiple threads
* simultaneously, unless the method explicitly allows it by bearing the [ ] annotation. If this annotation is not present, subscriber methods need not
* worry about being reentrant, unless also called from outside the EventBus.
*
* <h2>Dead Events</h2>
*
*
* If an event is posted, but no registered subscribers can accept it, it is considered "dead."
* To give the system a second chance to handle dead events, they are wrapped in an instance of
* [DeadMessage] and reposted.
*
*
* If a subscriber for a supertype of all events (such as Object) is registered, no event will
* ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadMessage
* extends [Object], a subscriber registered to receive any Object will never receive a
* DeadMessage.
*
*
* This class is safe for concurrent use.
*
*
* See the Guava User Guide article on [`EventBus`](https://github.com/google/guava/wiki/EventBusExplained).
*
* @author Cliff
* @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java
* @param executor the default executor this event bus uses for dispatching events to subscribers.
* @param dispatcher message dispatcher.
* @param exceptionHandler Handler for subscriber exceptions.
*/
open class MessageBus<T : Any> internal constructor(
override val identifier: String,
override val executor: Executor,
val dispatcher: Dispatcher<T>,
val exceptionHandler: SubscriberExceptionHandler<T>,
) : IMessageBus<T> {
val DEAD_TOPIC = "DEAD_TOPIC"
val subscribers = TopicSubscriberRegistry(this)
/**
* Creates a new EventBus with the given `identifier`.
*
* @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java
* identifier.
*/
@JvmOverloads
constructor(identifier: String = "default") : this(
identifier,
Executor { it.run() },
Dispatcher.perThreadDispatchQueue(),
LoggingHandler()
)
/**
* Creates a new EventBus with the given [SubscriberExceptionHandler].
*
* @param exceptionHandler Handler for subscriber exceptions.
* @since 16.0
*/
constructor(exceptionHandler: SubscriberExceptionHandler<T>) : this(
"default",
Executor { it.run() },
Dispatcher.perThreadDispatchQueue(),
exceptionHandler
)
override fun register(topics: Array<String>, subscriber: Any) {
subscribers.register(topics, subscriber)
}
/**
* Registers subscriber method on `object` to receive messages.
*
* @param topics method subscribe topic.
* @param subscriber subscriber method declare object.
* @param method subscriber method should be registered.
*/
override fun register(topics: Array<String>, subscriber: Any, method: Method) {
subscribers.register(topics, subscriber, method)
}
/**
* Registers all subscriber methods on `object` to receive messages.
*
* @param subscriber object whose subscriber methods should be registered.
*/
override fun register(subscriber: Any) {
subscribers.register(subscriber)
}
override fun register(subscriber: Any, method: Method) {
subscribers.register(subscriber, method)
}
/**
* Unregisters all subscriber methods on a registered `object`.
*
* @param subscriber object whose subscriber methods should be unregistered.
* @throws IllegalArgumentException if the object was not previously registered.
*/
override fun unregister(subscriber: Any) {
subscribers.unregister(subscriber)
}
override fun unregister(topic: String, subscriber: Any) {
subscribers.unregister(topic, subscriber)
}
/**
* Posts an message to all registered subscribers. This method will return successfully after the
* message has been posted to all subscribers, and regardless of any exceptions thrown by
* subscribers.
*
* @param message message to post.
*/
override fun publish(message: T) {
val messageSubscribers = subscribers.getSubscribers(message.javaClass.name)
if (messageSubscribers.hasNext()) {
dispatcher.dispatch(message, messageSubscribers)
} else {
// the message had no subscribers and was not itself a DeadMessage
publish(DEAD_TOPIC, message)
}
}
override fun publish(topic: String, message: T) {
val messageSubscribers = subscribers.getSubscribers(topic)
if (messageSubscribers.hasNext()) {
dispatcher.dispatch(message, messageSubscribers)
} else if (topic != DEAD_TOPIC) {
// the message had no subscribers and was not itself a DeadMessage
publish(DEAD_TOPIC, message)
}
}
/** Handles the given exception thrown by a subscriber with the given context. */
override fun handleException(cause: Throwable?, context: SubscriberExceptionContext<T>) {
try {
exceptionHandler.handleException(cause, context)
} catch (e2: Throwable) {
// if the handler threw an exception... well, just log it
logger.log(
Level.SEVERE, String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, cause),
e2
)
}
}
override fun toString(): String {
return "MessageBus(identifier='$identifier', executor=$executor, dispatcher=$dispatcher, exceptionHandler=$exceptionHandler, subscribers=$subscribers)"
}
companion object {
private val logger = Logger.getLogger(Bus::class.java.name)
}
}

View File

@@ -1,184 +0,0 @@
/*
* Copyright (C) 2014 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus.messagebus
import com.synebula.gaea.bus.Subscribe
import com.synebula.gaea.bus.Subscriber
import com.synebula.gaea.bus.SubscriberRegistry
import java.lang.reflect.Method
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArraySet
/**
* Registry of subscribers to a single message bus.
*
* @author Colin Decker
*/
open class TopicSubscriberRegistry<T : Any>(private val bus: IMessageBus<T>) : SubscriberRegistry<T>(bus) {
/**
* All registered subscribers, indexed by message type.
*
*
* The [CopyOnWriteArraySet] values make it easy and relatively lightweight to get an
* immutable snapshot of all current subscribers to an message without any locking.
*/
private val subscribers = ConcurrentHashMap<String, CopyOnWriteArraySet<Subscriber<T>>>()
open fun register(topics: Array<String>, subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
for (topic in topics) {
for ((_, messageMethodsInListener) in listenerMethods) {
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.addAll(messageMethodsInListener)
}
}
}
open fun register(topics: Array<String>, subscriber: Any, method: Method) {
for (topic in topics) {
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.add(Subscriber.create(this.bus, subscriber, method))
}
}
/** Registers all subscriber methods on the given subscriber object. */
override fun register(subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
for ((topic, messageMethodsInListener) in listenerMethods) {
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.addAll(messageMethodsInListener)
}
}
/** Registers subscriber method on the given subscriber object. */
override fun register(subscriber: Any, method: Method) {
val parameterTypes = method.parameterTypes
check(parameterTypes.size == 1) {
"Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter."
}
check(!parameterTypes[0].isPrimitive) {
"@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. "
}
val eventType = parameterTypes[0]
val topic = eventType.name
var messageSubscribers = subscribers[topic]
if (messageSubscribers == null) {
val newSet = CopyOnWriteArraySet<Subscriber<T>>()
messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet
}
messageSubscribers.add(Subscriber.create(bus, subscriber, method))
}
/** Unregisters all subscribers on the given subscriber object. */
override fun unregister(subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
for ((topic, listenerMethodsForType) in listenerMethods) {
val currentSubscribers = subscribers[topic]
require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType.toSet()))) {
// if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on subscriber for that message type were... after
// all, the definition of subscribers on a particular class is totally static
"missing message subscriber for an annotated method. Is $subscriber registered?"
}
// don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0
}
}
/** Unregisters all subscribers by topic on the given subscriber object. */
open fun unregister(topic: String, subscriber: Any) {
val listenerMethods = findAllSubscribers(subscriber)
if (listenerMethods[topic] == null) return //不包含topic则推出
val currentSubscribers = subscribers[topic]
require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethods[topic]!!.toSet()))) {
// if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on subscriber for that message type were... after
// all, the definition of subscribers on a particular class is totally static
"missing message subscriber for an annotated method. Is $subscriber registered?"
}
// don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0
}
/**
* Gets an iterator representing an immutable snapshot of all subscribers to the given message at
* the time this method is called.
*/
fun getSubscribers(topic: String): Iterator<Subscriber<T>> {
val topicSubscribers: CopyOnWriteArraySet<Subscriber<T>> = subscribers[topic] ?: CopyOnWriteArraySet()
return topicSubscribers.iterator()
}
/**
* Returns all subscribers for the given subscriber grouped by the type of message they subscribe to.
*/
private fun findAllSubscribers(subscriber: Any): Map<String, List<Subscriber<T>>> {
val methodsInListener = mutableMapOf<String, MutableList<Subscriber<T>>>()
val clazz: Class<*> = subscriber.javaClass
for (method in getAnnotatedMethods(clazz)) {
var topics = method.getAnnotation(Subscribe::class.java).topics
//如果没有定义topic则使用消息类名称做topic
if (topics.isEmpty()) {
val parameterTypes = method.parameterTypes
val messageType = parameterTypes[0]
topics = arrayOf(messageType.name)
}
for (topic in topics) {
val listeners = methodsInListener.getOrDefault(topic, mutableListOf())
listeners.add(Subscriber.create(bus, subscriber, method))
methodsInListener[topic] = listeners
}
}
return methodsInListener
}
class MethodIdentifier(method: Method) {
private val name: String = method.name
private val parameterTypes: List<Class<*>> = listOf(*method.parameterTypes)
override fun hashCode(): Int {
var result = name.hashCode()
for (element in parameterTypes) result = 31 * result + element.hashCode()
return result
}
override fun equals(other: Any?): Boolean {
if (other is MethodIdentifier) {
return name == other.name && parameterTypes == other.parameterTypes
}
return false
}
}
}

View File

@@ -0,0 +1,5 @@
package com.synebula.gaea.domain.event
import com.synebula.gaea.domain.model.IAggregateRoot
class AfterRemoveEvent<T : IAggregateRoot<I>, I>(var id: I? = null)

View File

@@ -0,0 +1,5 @@
package com.synebula.gaea.domain.event
import com.synebula.gaea.domain.model.IAggregateRoot
class BeforeRemoveEvent<T : IAggregateRoot<I>, I>(var id: I? = null)

View File

@@ -68,7 +68,7 @@ interface IRepository<TAggregateRoot : IAggregateRoot<ID>, ID> {
* @param params 查询条件。 * @param params 查询条件。
* @return int * @return int
*/ */
fun <TAggregateRoot> count(params: Map<String, Any>?): Int fun count(params: Map<String, Any>?): Int
} }

View File

@@ -1,9 +1,14 @@
package com.synebula.gaea.domain.service package com.synebula.gaea.domain.service
import com.synebula.gaea.bus.IBus
import com.synebula.gaea.data.message.DataMessage import com.synebula.gaea.data.message.DataMessage
import com.synebula.gaea.data.serialization.IObjectMapper import com.synebula.gaea.data.serialization.IObjectMapper
import com.synebula.gaea.domain.event.AfterRemoveEvent
import com.synebula.gaea.domain.event.BeforeRemoveEvent
import com.synebula.gaea.domain.model.IAggregateRoot import com.synebula.gaea.domain.model.IAggregateRoot
import com.synebula.gaea.domain.repository.IRepository import com.synebula.gaea.domain.repository.IRepository
import com.synebula.gaea.ext.firstCharLowerCase
import javax.annotation.Resource
/** /**
@@ -22,6 +27,8 @@ open class Service<TRoot : IAggregateRoot<ID>, ID>(
protected open var repository: IRepository<TRoot, ID>, protected open var repository: IRepository<TRoot, ID>,
protected open var mapper: IObjectMapper, protected open var mapper: IObjectMapper,
) : IService<ID> { ) : IService<ID> {
@Resource
protected open var bus: IBus<Any>? = null
/** /**
* 增加对象 * 增加对象
@@ -73,7 +80,17 @@ open class Service<TRoot : IAggregateRoot<ID>, ID>(
* @param id 对象ID * @param id 对象ID
*/ */
override fun remove(id: ID) { override fun remove(id: ID) {
val beforeRemoveEvent = BeforeRemoveEvent<TRoot, ID>(id)
this.bus?.publish(
"${this.clazz.simpleName.firstCharLowerCase()}${BeforeRemoveEvent::class.java.simpleName}",
beforeRemoveEvent
)
this.repository.remove(id) this.repository.remove(id)
val afterRemoveEvent = AfterRemoveEvent<TRoot, ID>(id)
this.bus?.publish(
"${this.clazz.simpleName.firstCharLowerCase()}${AfterRemoveEvent::class.java.simpleName}",
afterRemoveEvent
)
} }
/** /**

View File

@@ -1,8 +1,5 @@
package com.synebula.gaea.bus package com.synebula.gaea.bus
import com.synebula.gaea.bus.messagebus.AsyncMessageBus
import com.synebula.gaea.bus.messagebus.IMessageBus
import com.synebula.gaea.bus.messagebus.MessageBus
import org.junit.Test import org.junit.Test
import java.util.concurrent.Executors import java.util.concurrent.Executors
@@ -21,36 +18,36 @@ class BusTest {
@Test @Test
fun testAsyncBus() { fun testAsyncBus() {
val bus: IBus<Any> = AsyncBus(Executors.newFixedThreadPool(10)) val bus: IBus<Any> = Bus(Executors.newFixedThreadPool(10))
val subscriber = TestSubscriber() val subscriber = TestSubscriber()
bus.register(subscriber, subscriber.javaClass.declaredMethods[0]) bus.register(subscriber, subscriber.javaClass.declaredMethods[0])
bus.register(subscriber, subscriber.javaClass.declaredMethods[1]) bus.register(subscriber, subscriber.javaClass.declaredMethods[1])
bus.publish("Hello world") bus.publishAsync("Hello world")
bus.publish(subscriber) bus.publishAsync(subscriber)
} }
@Test @Test
fun testMessageBus() { fun testTopicBus() {
val bus: IMessageBus<Any> = MessageBus() val bus: IBus<Any> = Bus()
val subscriber = TestTopicSubscriber() val subscriber = TestTopicSubscriber()
bus.register(subscriber) bus.register(subscriber)
bus.publish("hello", "Hello world") bus.publish("hello", "Hello world")
bus.publish("whoami", subscriber) bus.publishAsync("whoami", subscriber)
} }
@Test @Test
fun testMessageBus2() { fun testAsyncTopicBus() {
val bus: IMessageBus<Any> = AsyncMessageBus(Executors.newFixedThreadPool(10)) val bus: IBus<Any> = Bus(Executors.newFixedThreadPool(10))
val subscriber = TestTopicSubscriber() val subscriber = TestTopicSubscriber()
val subscriber2 = TestTopicSubscriber2() val subscriber2 = TestTopicSubscriber2()
bus.register(subscriber) bus.register(subscriber)
bus.register(subscriber2) bus.register(subscriber2)
bus.publish("hello", "Hello world") bus.publishAsync("hello", "Hello world")
bus.publish("whoami", subscriber) bus.publishAsync("whoami", subscriber)
bus.unregister(subscriber2) bus.unregister(subscriber2)
bus.publish("hello", "Hello world") bus.publishAsync("hello", "Hello world")
bus.publish("whoami", subscriber) bus.publishAsync("whoami", subscriber)
} }
internal class TestSubscriber { internal class TestSubscriber {