diff --git a/build.gradle b/build.gradle index 128548a..345a5bc 100644 --- a/build.gradle +++ b/build.gradle @@ -17,7 +17,7 @@ buildscript { subprojects { group 'com.synebula' - version '1.2.0' + version '1.3.0' buildscript { repositories { diff --git a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceFactory.kt b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceFactory.kt index e3447dd..c3e33a2 100644 --- a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceFactory.kt +++ b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceFactory.kt @@ -7,9 +7,8 @@ import org.springframework.beans.factory.BeanFactory class ServiceFactory( supertype: Class<*>, var beanFactory: BeanFactory, - var implementBeanNames: Array = arrayOf() ) : Factory(supertype) { override fun createProxy(): Proxy { - return ServiceProxy(supertype, this.beanFactory, this.implementBeanNames) + return ServiceProxy(supertype, this.beanFactory) } } \ No newline at end of file diff --git a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceProxy.kt b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceProxy.kt index 05ec640..15519fe 100644 --- a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceProxy.kt +++ b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceProxy.kt @@ -1,49 +1,57 @@ package com.synebula.gaea.app.autoconfig.service +import com.synebula.gaea.bus.IBus import com.synebula.gaea.data.serialization.IObjectMapper import com.synebula.gaea.domain.repository.IRepository import com.synebula.gaea.domain.repository.IRepositoryFactory import com.synebula.gaea.domain.service.Domain import com.synebula.gaea.domain.service.IService import com.synebula.gaea.domain.service.Service +import com.synebula.gaea.exception.NoticeUserException import com.synebula.gaea.spring.autoconfig.Proxy import org.springframework.beans.factory.BeanFactory +import org.springframework.core.ResolvableType import java.io.InvalidClassException +import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method class ServiceProxy( private var supertype: Class<*>, - private var beanFactory: BeanFactory, implementBeanNames: Array = arrayOf() + private var beanFactory: BeanFactory ) : Proxy() { private var service: IService<*> init { // 如果没有实现类, 使用Service类代理 - if (implementBeanNames.isEmpty()) { - // 如果没有实现类并且没有ServiceDependency注解, 则抛出异常 - if (!this.supertype.declaredAnnotations.any { it.annotationClass == Domain::class }) { - throw InvalidClassException( - "interface ${this.supertype.name} must has implementation class or annotation by ${Domain::class.qualifiedName}" - ) - } - val domain = this.supertype.getDeclaredAnnotation(Domain::class.java) - - // repository工厂对象 - val defaultRepositoryFactory = this.beanFactory.getBean(IRepositoryFactory::class.java) - val mapper = this.beanFactory.getBean(IObjectMapper::class.java) - - val constructor = Service::class.java.getConstructor( - Class::class.java, IRepository::class.java, IObjectMapper::class.java + // 如果没有实现类并且没有ServiceDependency注解, 则抛出异常 + if (!this.supertype.declaredAnnotations.any { it.annotationClass == Domain::class }) { + throw InvalidClassException( + "interface ${this.supertype.name} must has implementation class or annotation by ${Domain::class.qualifiedName}" ) - this.service = - constructor.newInstance( - domain.clazz.java, - defaultRepositoryFactory.createRawRepository(domain.clazz.java), - mapper - ) - } else { - this.service = this.beanFactory.getBean(implementBeanNames[0]) as IService<*> + } + val domain = this.supertype.getDeclaredAnnotation(Domain::class.java) + + // repository工厂对象 + val defaultRepositoryFactory = this.beanFactory.getBean(IRepositoryFactory::class.java) + val mapper = this.beanFactory.getBean(IObjectMapper::class.java) + + val constructor = Service::class.java.getConstructor( + Class::class.java, IRepository::class.java, IObjectMapper::class.java + ) + this.service = + constructor.newInstance( + domain.clazz.java, + defaultRepositoryFactory.createRawRepository(domain.clazz.java), + mapper + ) + + // 尝试注入IBus对象 + val bus = Service::class.java.getDeclaredField("bus") + val iBusObjectProvider = this.beanFactory.getBeanProvider>(ResolvableType.forField(bus)) + iBusObjectProvider.ifAvailable { busBean -> + bus.isAccessible = true + bus.set(this.service, busBean) } } @@ -61,6 +69,11 @@ class ServiceProxy( return proxyMethod.invoke(this.service, *args) } catch (ex: NoSuchMethodException) { throw NoSuchMethodException("method [${method.toGenericString()}] not implements in class [${this.service::class.java}], you must implements interface [${this.supertype.name}] ") + } catch (ex: InvocationTargetException) { + if (ex.cause is Error || ex.cause is NoticeUserException) { + throw ex.targetException!! + } + throw ex } } } \ No newline at end of file diff --git a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceRegister.kt b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceRegister.kt index d62878b..0dd1f0a 100644 --- a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceRegister.kt +++ b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/autoconfig/service/ServiceRegister.kt @@ -30,20 +30,20 @@ class ServiceRegister : Register() { // 尝试获取实际继承类型 val implBeanDefinitions = this.doScan(basePackages, arrayOf(this.interfaceFilter(arrayOf(beanClazz)))) - implBeanDefinitions.forEach { - it.isAutowireCandidate = false - result[it.beanClassName!!] = it + if (implBeanDefinitions.isNotEmpty()) { + implBeanDefinitions.forEach { + result[it.beanClassName!!] = it + } + } else { + // 构造BeanDefinition + val builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz) + builder.addConstructorArgValue(beanClazz) + builder.addConstructorArgValue(this._beanFactory) + val definition = builder.rawBeanDefinition as GenericBeanDefinition + definition.beanClass = ServiceFactory::class.java + definition.autowireMode = GenericBeanDefinition.AUTOWIRE_BY_TYPE + result[beanClazz.name] = definition } - - // 构造BeanDefinition - val builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz) - builder.addConstructorArgValue(beanClazz) - builder.addConstructorArgValue(this._beanFactory) - builder.addConstructorArgValue(implBeanDefinitions.map { it.beanClassName }) - val definition = builder.rawBeanDefinition as GenericBeanDefinition - definition.beanClass = ServiceFactory::class.java - definition.autowireMode = GenericBeanDefinition.AUTOWIRE_BY_TYPE - result[beanClazz.name] = definition } return result } diff --git a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/component/bus/EventBus.kt b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/component/bus/EventBus.kt index acd6c29..87f6c0c 100644 --- a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/component/bus/EventBus.kt +++ b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/component/bus/EventBus.kt @@ -4,5 +4,5 @@ import com.synebula.gaea.bus.Bus import org.springframework.stereotype.Component @Component -class EventBus : Bus() +class EventBus : Bus() diff --git a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/component/bus/EventBusSubscriberProcessor.kt b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/component/bus/EventBusSubscriberProcessor.kt index 2a00a4d..f87bbd6 100644 --- a/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/component/bus/EventBusSubscriberProcessor.kt +++ b/src/gaea.app/src/main/kotlin/com/synebula/gaea/app/component/bus/EventBusSubscriberProcessor.kt @@ -14,7 +14,7 @@ class EventBusSubscriberProcessor : BeanPostProcessor { // 事件总线bean由Spring IoC容器负责创建,这里只需要通过@Autowired注解注入该bean即可使用事件总线 @Autowired - var messageBus: IBus? = null + var bus: IBus? = null @Throws(BeansException::class) override fun postProcessBeforeInitialization(bean: Any, beanName: String): Any { @@ -34,12 +34,15 @@ class EventBusSubscriberProcessor : BeanPostProcessor { if (annotation.annotationClass == Subscribe::class) { // 如果这是一个Guava @Subscribe注解的事件监听器方法,说明所在bean实例 // 对应一个Guava事件监听器类,将该bean实例注册到Guava事件总线 - messageBus?.register(bean) + val subscribe = annotation as Subscribe + if (subscribe.topics.isEmpty()) + bus?.register(bean, method) + else + bus?.register(subscribe.topics, bean, method) return bean } } } return bean } - } \ No newline at end of file diff --git a/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoFactory.kt b/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoFactory.kt index dbdeacf..f654626 100644 --- a/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoFactory.kt +++ b/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoFactory.kt @@ -7,9 +7,8 @@ import org.springframework.beans.factory.BeanFactory class MongodbRepoFactory( supertype: Class<*>, var beanFactory: BeanFactory, - var implementBeanNames: Array = arrayOf() ) : Factory(supertype) { override fun createProxy(): Proxy { - return MongodbRepoProxy(supertype, this.beanFactory, this.implementBeanNames) + return MongodbRepoProxy(supertype, this.beanFactory) } } \ No newline at end of file diff --git a/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoProxy.kt b/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoProxy.kt index 30e926b..5fa6459 100644 --- a/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoProxy.kt +++ b/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoProxy.kt @@ -11,7 +11,7 @@ import org.springframework.data.mongodb.core.MongoTemplate import java.lang.reflect.Method class MongodbRepoProxy( - private var supertype: Class<*>, private var beanFactory: BeanFactory, implementBeanNames: Array = arrayOf() + private var supertype: Class<*>, private var beanFactory: BeanFactory ) : Proxy() { private var mongodbRepo: Any @@ -28,15 +28,11 @@ class MongodbRepoProxy( interfaceClazz = IQuery::class.java } - if (implementBeanNames.isEmpty()) { - val constructor = clazz.getConstructor(Class::class.java, MongoTemplate::class.java) - this.mongodbRepo = constructor.newInstance( - this.supertype.getGenericInterface(interfaceClazz)!!.actualTypeArguments[0], - this.beanFactory.getBean(MongoTemplate::class.java) - ) - } else { - this.mongodbRepo = this.beanFactory.getBean(implementBeanNames[0]) - } + val constructor = clazz.getConstructor(Class::class.java, MongoTemplate::class.java) + this.mongodbRepo = constructor.newInstance( + this.supertype.getGenericInterface(interfaceClazz)!!.actualTypeArguments[0], + this.beanFactory.getBean(MongoTemplate::class.java) + ) } /** diff --git a/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoRegister.kt b/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoRegister.kt index 55d6bda..85d4fee 100644 --- a/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoRegister.kt +++ b/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/autoconfig/MongodbRepoRegister.kt @@ -34,20 +34,20 @@ class MongodbRepoRegister : Register() { // 尝试获取实际继承类型 val implBeanDefinitions = this.doScan(basePackages, arrayOf(this.interfaceFilter(arrayOf(beanClazz)))) - implBeanDefinitions.forEach { - it.isAutowireCandidate = false - result[it.beanClassName!!] = it - } - // 构造BeanDefinition - val builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz) - builder.addConstructorArgValue(beanClazz) - builder.addConstructorArgValue(this._beanFactory) - builder.addConstructorArgValue(implBeanDefinitions.map { it.beanClassName }) - val definition = builder.rawBeanDefinition as GenericBeanDefinition - definition.beanClass = MongodbRepoFactory::class.java - definition.autowireMode = GenericBeanDefinition.AUTOWIRE_BY_TYPE - result[beanClazz.name] = definition + if (implBeanDefinitions.isNotEmpty()) { + implBeanDefinitions.forEach { + result[it.beanClassName!!] = it + } + } else { // 构造BeanDefinition + val builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz) + builder.addConstructorArgValue(beanClazz) + builder.addConstructorArgValue(this._beanFactory) + val definition = builder.rawBeanDefinition as GenericBeanDefinition + definition.beanClass = MongodbRepoFactory::class.java + definition.autowireMode = GenericBeanDefinition.AUTOWIRE_BY_TYPE + result[beanClazz.name] = definition + } } return result } diff --git a/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/repository/MongodbRepository.kt b/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/repository/MongodbRepository.kt index 8c3a348..02ba374 100644 --- a/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/repository/MongodbRepository.kt +++ b/src/gaea.mongodb/src/main/kotlin/com/synebula/gaea/mongodb/repository/MongodbRepository.kt @@ -40,7 +40,7 @@ open class MongodbRepository, ID>( this.repo.save(list) } - override fun count(params: Map?): Int { + override fun count(params: Map?): Int { val query = Query() return this.repo.count(query.where(params, clazz), clazz).toInt() } diff --git a/src/gaea.spring/src/main/kotlin/com/synebula/gaea/spring/aop/AppAspect.kt b/src/gaea.spring/src/main/kotlin/com/synebula/gaea/spring/aop/AppAspect.kt index ace1148..5a305db 100644 --- a/src/gaea.spring/src/main/kotlin/com/synebula/gaea/spring/aop/AppAspect.kt +++ b/src/gaea.spring/src/main/kotlin/com/synebula/gaea/spring/aop/AppAspect.kt @@ -61,13 +61,13 @@ abstract class AppAspect { point.proceed() } catch (ex: Throwable) { //找到类的模块名称,否则使用类名 - var moduleName = clazz.name + var moduleName = clazz.simpleName val module = clazz.annotations.find { it is Module } if (module != null && module is Module) { moduleName = module.name } var message = "$moduleName - $funcName 异常" - message = if (ex is NoticeUserException) { + message = if (ex is NoticeUserException || ex is Error) { "$message: ${ex.message}" } else { "$message。" diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AllowConcurrentSubscribe.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AllowConcurrentSubscribe.kt index 1fe2f13..77687ee 100644 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AllowConcurrentSubscribe.kt +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AllowConcurrentSubscribe.kt @@ -14,7 +14,7 @@ package com.synebula.gaea.bus /** - * Marks an message subscriber method as being thread-safe. This annotation indicates that MessageBus + * Marks a message subscriber method as being thread-safe. This annotation indicates that MessageBus * may invoke the message subscriber simultaneously from multiple threads. * * diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AsyncBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AsyncBus.kt deleted file mode 100644 index 1da1f56..0000000 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/AsyncBus.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (C) 2007 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package com.synebula.gaea.bus - -import java.util.concurrent.Executor - -/** - * An [Bus] that takes the Executor of your choice and uses it to dispatch messages, - * allowing dispatch to occur asynchronously. - * - * @author Cliff - * @since 10.0 - */ -class AsyncBus : Bus { - /** - * Creates a new AsyncMessageBus that will use `executor` to dispatch messages. Assigns `identifier` as the bus's name for logging purposes. - * - * @param identifier short name for the bus, for logging purposes. - * @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut - * down the executor after the last message has been posted to this message bus. - */ - constructor(identifier: String, executor: Executor) : super( - identifier, - executor, - Dispatcher.legacyAsync(), - LoggingHandler() - ) - - /** - * Creates a new AsyncMessageBus that will use `executor` to dispatch messages. - * - * @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut - * down the executor after the last message has been posted to this message bus. - * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers. - * See [SubscriberExceptionHandler] for more information. - * @since 16.0 - */ - constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler) : super( - "default", - executor, - Dispatcher.legacyAsync(), - subscriberExceptionHandler - ) - - /** - * Creates a new AsyncMessageBus that will use `executor` to dispatch messages. - * - * @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut - * down the executor after the last message has been posted to this message bus. - */ - constructor(executor: Executor) : super( - "default", - executor, - Dispatcher.legacyAsync(), - LoggingHandler() - ) -} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Bus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Bus.kt index bf36e15..e4c2952 100644 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Bus.kt +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Bus.kt @@ -37,7 +37,7 @@ import java.util.logging.Logger * * To react to messages, we recommend a reactive-streams framework like [RxJava](https://github.com/ReactiveX/RxJava/wiki) (supplemented with its [RxAndroid](https://github.com/ReactiveX/RxAndroid) extension if you are building for * Android) or [Project Reactor](https://projectreactor.io/). (For the basics of - * translating code from using an message bus to using a reactive-streams framework, see these two + * translating code from using a message bus to using a reactive-streams framework, see these two * guides: [1](https://blog.jkl.gg/implementing-an-message-bus-with-rxjava-rxbus/), [2](https://lorentzos.com/rxjava-as-message-bus-the-right-way-10a36bdd49ba).) Some usages * of MessageBus may be better written using [Kotlin coroutines](https://kotlinlang.org/docs/coroutines-guide.html), including [Flow](https://kotlinlang.org/docs/flow.html) and [Channels](https://kotlinlang.org/docs/channels.html). Yet other usages are better served * by individual libraries that provide specialized support for particular use cases. @@ -60,12 +60,10 @@ import java.util.logging.Logger * * It doesn't propagate exceptions, so apps don't have a way to react to them. * * It doesn't interoperate well with RxJava, coroutines, and other more commonly used * alternatives. - * * It imposes requirements on the lifecycle of its subscribers. For example, if an message + * * It imposes requirements on the lifecycle of its subscribers. For example, if a message * occurs between when one subscriber is removed and the next subscriber is added, the message * is dropped. * * Its performance is suboptimal, especially under Android. - * * It [doesn't support parameterized - * types](https://github.com/google/guava/issues/1431). * * With the introduction of lambdas in Java 8, MessageBus went from less verbose than listeners * to [more verbose](https://github.com/google/guava/issues/3311). * @@ -94,19 +92,15 @@ import java.util.logging.Logger *

Posting Messages

* * - * To post an message, simply provide the message object to the [.post] method. The + * To post a message, simply provide the message object to the [.post] method. The * MessageBus instance will determine the type of message and route it to all registered listeners. * * - * Messages are routed based on their type an message will be delivered to any subscriber for + * Messages are routed based on their type a message will be delivered to any subscriber for * any type to which the message is *assignable.* This includes implemented interfaces, all - * superclasses, and all interfaces implemented by superclasses. + * thisclasses, and all interfaces implemented by thisclasses. * * - * When `post` is called, all registered subscribers for an message are run in sequence, so - * subscribers should be reasonably quick. If an message may trigger an extended process (such as a - * database load), spawn a thread or queue it for later. (For a convenient way to do this, use an - * [AsyncBus].) * *

Subscriber Methods

* @@ -126,12 +120,12 @@ import java.util.logging.Logger *

Dead Messages

* * - * If an message is posted, but no registered subscribers can accept it, it is considered "dead." + * If a message is posted, but no registered subscribers can accept it, it is considered "dead." * To give the system a second chance to handle dead messages, they are wrapped in an instance of * [DeadMessage] and reposted. * * - * If a subscriber for a supertype of all messages (such as Object) is registered, no message will + * If a subscriber for this type of all messages (such as Object) is registered, no message will * ever be considered dead, and no DeadMessages will be generated. Accordingly, while DeadMessage * extends [Object], a subscriber registered to receive any Object will never receive a * DeadMessage. @@ -140,8 +134,6 @@ import java.util.logging.Logger * This class is safe for concurrent use. * * - * See the Guava User Guide article on [`MessageBus`](https://github.com/google/guava/wiki/MessageBusExplained). - * * @author Cliff * @since 10.0 * @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java @@ -149,14 +141,16 @@ import java.util.logging.Logger * @param dispatcher message dispatcher. * @param exceptionHandler Handler for subscriber exceptions. */ -open class Bus( +open class Bus( override val identifier: String, override val executor: Executor, - val dispatcher: Dispatcher, - val exceptionHandler: SubscriberExceptionHandler, -) : IBus { + val dispatcher: Dispatcher, + val exceptionHandler: SubscriberExceptionHandler, +) : IBus { - private val subscribers: SubscriberRegistry = SubscriberRegistry(this) + val DEAD_TOPIC = "DEAD_TOPIC" + + private val subscribers: SubscriberRegistry = SubscriberRegistry(this) /** * Creates a new MessageBus with the given `identifier`. @@ -178,18 +172,76 @@ open class Bus( * @param exceptionHandler Handler for subscriber exceptions. * @since 16.0 */ - constructor(exceptionHandler: SubscriberExceptionHandler) : this( + constructor(exceptionHandler: SubscriberExceptionHandler) : this( "default", Executor { it.run() }, Dispatcher.perThreadDispatchQueue(), exceptionHandler ) + /** + * Creates a new AsyncMessageBus that will use `executor` to dispatch messages. Assigns `identifier` as the bus's name for logging purposes. + * + * @param identifier short name for the bus, for logging purposes. + * @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut + * down the executor after the last message has been posted to this message bus. + */ + constructor(identifier: String, executor: Executor) : this( + identifier, + executor, + Dispatcher.legacyAsync(), + LoggingHandler() + ) + + /** + * Creates a new AsyncMessageBus that will use `executor` to dispatch messages. + * + * @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut + * down the executor after the last message has been posted to this message bus. + * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers. + * See [SubscriberExceptionHandler] for more information. + * @since 16.0 + */ + constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler) : this( + "default", + executor, + Dispatcher.legacyAsync(), + subscriberExceptionHandler + ) + + /** + * Creates a new AsyncMessageBus that will use `executor` to dispatch messages. + * + * @param executor Executor to use to dispatch messages. It is the caller's responsibility to shut + * down the executor after the last message has been posted to this message bus. + */ + constructor(executor: Executor) : this( + "default", + executor, + Dispatcher.legacyAsync(), + LoggingHandler() + ) + + override fun register(topics: Array, subscriber: Any) { + subscribers.register(topics, subscriber) + } + + /** + * Registers subscriber method on `object` to receive messages. + * + * @param topics method subscribe topic. + * @param subscriber subscriber method declare object. + * @param method subscriber method should be registered. + */ + override fun register(topics: Array, subscriber: Any, method: Method) { + subscribers.register(topics, subscriber, method) + } + /** * Registers all subscriber methods on `object` to receive messages. * - * @param subscriber object whose subscriber methods should be registered. + * @param subscriber object whose subscriber methods should be registered. */ override fun register(subscriber: Any) { subscribers.register(subscriber) @@ -202,37 +254,73 @@ open class Bus( /** * Unregisters all subscriber methods on a registered `object`. * - * @param subscriber object whose subscriber methods should be unregistered. + * @param subscriber object whose subscriber methods should be unregistered. * @throws IllegalArgumentException if the object was not previously registered. */ override fun unregister(subscriber: Any) { subscribers.unregister(subscriber) } + override fun unregister(topic: String, subscriber: Any) { + subscribers.unregister(topic, subscriber) + } + /** - * Posts an message to all registered subscribers. This method will return successfully after the + * Posts a message to all registered subscribers. This method will return successfully after the * message has been posted to all subscribers, and regardless of any exceptions thrown by * subscribers. * - * - * If no subscribers have been subscribed for `message`'s class, and `message` is not - * already a [DeadMessage], it will be wrapped in a DeadMessage and reposted. - * * @param message message to post. */ - override fun publish(message: Any) { - val messageSubscribers = subscribers.getSubscribers(message) + override fun publish(message: T) { + val messageSubscribers = subscribers.getSubscribers(message::class.java.name) if (messageSubscribers.hasNext()) { dispatcher.dispatch(message, messageSubscribers) - } else if (message !is DeadMessage) { + } else { // the message had no subscribers and was not itself a DeadMessage - publish(DeadMessage(this, message)) + publish(DEAD_TOPIC, message) } } + /** + * Posts a message to all registered subscribers. This method will return successfully after the + * message has been posted to all subscribers, and regardless of any exceptions thrown by + * subscribers. + * + * @param message message to post. + */ + override fun publishAsync(message: T) { + val messageSubscribers = subscribers.getSubscribers(message::class.java.name) + if (messageSubscribers.hasNext()) { + dispatcher.dispatchAsync(message, messageSubscribers) + } else { + // the message had no subscribers and was not itself a DeadMessage + publishAsync(DEAD_TOPIC, message) + } + } + + override fun publish(topic: String, message: T) { + val messageSubscribers = subscribers.getSubscribers(topic) + if (messageSubscribers.hasNext()) { + dispatcher.dispatch(message, messageSubscribers) + } else if (topic != DEAD_TOPIC) { + // the message had no subscribers and was not itself a DeadMessage + publish(DEAD_TOPIC, message) + } + } + + override fun publishAsync(topic: String, message: T) { + val messageSubscribers = subscribers.getSubscribers(topic) + if (messageSubscribers.hasNext()) { + dispatcher.dispatchAsync(message, messageSubscribers) + } else if (topic != DEAD_TOPIC) { + // the message had no subscribers and was not itself a DeadMessage + publishAsync(DEAD_TOPIC, message) + } + } /** Handles the given exception thrown by a subscriber with the given context. */ - override fun handleException(cause: Throwable?, context: SubscriberExceptionContext) { + override fun handleException(cause: Throwable?, context: SubscriberExceptionContext) { try { exceptionHandler.handleException(cause, context) } catch (e2: Throwable) { diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/DeadMessage.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/DeadMessage.kt index 1c044ac..6255b78 100644 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/DeadMessage.kt +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/DeadMessage.kt @@ -14,7 +14,7 @@ package com.synebula.gaea.bus /** - * Wraps an message that was posted, but which had no subscribers and thus could not be delivered. + * Wraps a message that was posted, but which had no subscribers and thus could not be delivered. * * * Registering a DeadMessage subscriber is useful for debugging or logging, as it can detect diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Dispatcher.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Dispatcher.kt index daab5f6..091519f 100644 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Dispatcher.kt +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Dispatcher.kt @@ -23,13 +23,20 @@ import java.util.concurrent.ConcurrentLinkedQueue * * **Note:** The dispatcher is orthogonal to the subscriber's `Executor`. The dispatcher * controls the order in which messages are dispatched, while the executor controls how (i.e. on which - * thread) the subscriber is actually called when an message is dispatched to it. + * thread) the subscriber is actually called when a message is dispatched to it. * * @author Colin Decker */ abstract class Dispatcher { /** Dispatches the given `message` to the given `subscribers`. */ - abstract fun dispatch(message: T, subscribers: Iterator>?) + fun dispatch(message: T, subscribers: Iterator>?) { + while (subscribers!!.hasNext()) { + subscribers.next().dispatch(message) + } + } + + /** Dispatches the given `message` to the given `subscribers`. */ + abstract fun dispatchAsync(message: T, subscribers: Iterator>?) /** Implementation of a [.perThreadDispatchQueue] dispatcher. */ private class PerThreadQueuedDispatcher : Dispatcher() { @@ -48,7 +55,7 @@ abstract class Dispatcher { } } - override fun dispatch(message: T, subscribers: Iterator>?) { + override fun dispatchAsync(message: T, subscribers: Iterator>?) { val queueForThread = queue.get() queueForThread.offer(Message(message, subscribers)) if (!dispatching.get()) { @@ -57,7 +64,7 @@ abstract class Dispatcher { var nextMessage: Message? while (queueForThread.poll().also { nextMessage = it } != null) { while (nextMessage!!.subscribers!!.hasNext()) { - nextMessage!!.subscribers!!.next().dispatchMessage(nextMessage!!.message) + nextMessage!!.subscribers!!.next().dispatchAsync(nextMessage!!.message) } } } finally { @@ -91,32 +98,23 @@ abstract class Dispatcher { // in some cases. /** Global message queue. */ private val queue = ConcurrentLinkedQueue>() - override fun dispatch(message: T, subscribers: Iterator>?) { + override fun dispatchAsync(message: T, subscribers: Iterator>?) { while (subscribers!!.hasNext()) { queue.add(MessageWithSubscriber(message, subscribers.next())) } var e: MessageWithSubscriber? while (queue.poll().also { e = it } != null) { - e!!.subscriber!!.dispatchMessage(e!!.message) + e!!.subscriber!!.dispatchAsync(e!!.message) } } private class MessageWithSubscriber(val message: T, val subscriber: Subscriber?) } - /** Implementation of [.immediate]. */ - private class ImmediateDispatcher : Dispatcher() { - override fun dispatch(message: T, subscribers: Iterator>?) { - while (subscribers!!.hasNext()) { - subscribers.next().dispatchMessage(message) - } - } - } - companion object { /** * Returns a dispatcher that queues messages that are posted reentrantly on a thread that is already - * dispatching an message, guaranteeing that all messages posted on a single thread are dispatched to + * dispatching a message, guaranteeing that all messages posted on a single thread are dispatched to * all subscribers in the order they are posted. * * @@ -138,14 +136,5 @@ abstract class Dispatcher { fun legacyAsync(): Dispatcher { return LegacyAsyncDispatcher() } - - /** - * Returns a dispatcher that dispatches messages to subscribers immediately as they're posted - * without using an intermediate queue to change the dispatch order. This is effectively a - * depth-first dispatch order, vs. breadth-first when using a queue. - */ - fun immediate(): Dispatcher { - return ImmediateDispatcher() - } } } \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IBus.kt index 207b5e3..d0ee938 100644 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IBus.kt +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/IBus.kt @@ -33,6 +33,49 @@ interface IBus { */ fun publish(message: T) + /** + * 异步发布事件 + * @param message 事件 + */ + fun publishAsync(message: T) + + /** + * 注册事件Listener + * @param topics 主题 + * @param subscriber subscriber对象 + */ + fun register(topics: Array, subscriber: Any) + + /** + * 注册事件Listener + * @param topics 主题 + * @param subscriber subscriber对象 + * @param method Listener方法 + */ + fun register(topics: Array, subscriber: Any, method: Method) + + + /** + * 取消注册事件Listener + * @param topic 主题 + * @param subscriber subscriber对象 + */ + fun unregister(topic: String, subscriber: Any) + + + /** + * 同步发布事件 + * @param topic 主题 + * @param message 事件 + */ + fun publish(topic: String, message: T) + + /** + * 异步发布事件 + * @param topic 主题 + * @param message 事件 + */ + fun publishAsync(topic: String, message: T) /** * 异常处理 diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscriber.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscriber.kt index c66d422..bd0484d 100644 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscriber.kt +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/Subscriber.kt @@ -13,6 +13,7 @@ */ package com.synebula.gaea.bus +import com.synebula.gaea.exception.NoticeUserException import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method import java.util.concurrent.Executor @@ -44,8 +45,13 @@ open class Subscriber private constructor( executor = bus.executor } + /** Dispatches `message` to this subscriber . */ + fun dispatch(message: Any) { + invokeSubscriberMethod(message) + } + /** Dispatches `message` to this subscriber using the proper executor. */ - fun dispatchMessage(message: Any) { + fun dispatchAsync(message: Any) { executor!!.execute { try { invokeSubscriberMethod(message) @@ -68,8 +74,8 @@ open class Subscriber private constructor( } catch (e: IllegalAccessException) { throw Error("Method became inaccessible: $message", e) } catch (e: InvocationTargetException) { - if (e.cause is Error) { - throw (e.cause as Error?)!! + if (e.cause is Error || e.cause is NoticeUserException) { + throw e.cause!! } throw e } diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberRegistry.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberRegistry.kt index 00b7244..cd713d8 100644 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberRegistry.kt +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/SubscriberRegistry.kt @@ -31,50 +31,75 @@ open class SubscriberRegistry(private val bus: IBus) { * The [CopyOnWriteArraySet] values make it easy and relatively lightweight to get an * immutable snapshot of all current subscribers to a message without any locking. */ - private val subscribers = ConcurrentHashMap, CopyOnWriteArraySet>>() + private val subscribers = ConcurrentHashMap>>() - - /** Registers all subscriber methods on the given subscriber object. */ - open fun register(subscriber: Any) { + open fun register(topics: Array, subscriber: Any) { val listenerMethods = findAllSubscribers(subscriber) - for ((eventType, messageMethodsInListener) in listenerMethods) { - var messageSubscribers = subscribers[eventType] + for (topic in topics) { + for ((_, messageMethodsInListener) in listenerMethods) { + var messageSubscribers = subscribers[topic] + if (messageSubscribers == null) { + val newSet = CopyOnWriteArraySet>() + messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet + } + messageSubscribers.addAll(messageMethodsInListener) + } + } + } + + open fun register(topics: Array, subscriber: Any, method: Method) { + for (topic in topics) { + var messageSubscribers = subscribers[topic] if (messageSubscribers == null) { val newSet = CopyOnWriteArraySet>() - messageSubscribers = subscribers.putIfAbsent(eventType, newSet) ?: newSet + messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet + } + messageSubscribers.add(Subscriber.create(this.bus, subscriber, method)) + } + } + + /** Registers all subscriber methods on the given subscriber object. */ + fun register(subscriber: Any) { + val listenerMethods = findAllSubscribers(subscriber) + for ((topic, messageMethodsInListener) in listenerMethods) { + var messageSubscribers = subscribers[topic] + if (messageSubscribers == null) { + val newSet = CopyOnWriteArraySet>() + messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet } messageSubscribers.addAll(messageMethodsInListener) } } /** Registers subscriber method on the given subscriber object. */ - open fun register(subscriber: Any, method: Method) { + fun register(subscriber: Any, method: Method) { val parameterTypes = method.parameterTypes - val eventType = parameterTypes[0] check(parameterTypes.size == 1) { "Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter." } check(!parameterTypes[0].isPrimitive) { "@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. " } - var messageSubscribers = subscribers[eventType] + + val eventType = parameterTypes[0] + val topic = eventType.name + var messageSubscribers = subscribers[topic] if (messageSubscribers == null) { val newSet = CopyOnWriteArraySet>() - messageSubscribers = subscribers.putIfAbsent(eventType, newSet) ?: newSet + messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet } messageSubscribers.add(Subscriber.create(bus, subscriber, method)) } - - /** Unregisters all subscribers on the given subscriber object. */ - open fun unregister(subscriber: Any) { + /** Unregisters all subscribers on the given subscriber object. */ + fun unregister(subscriber: Any) { val listenerMethods = findAllSubscribers(subscriber) - for ((eventType, listenerMethodsForType) in listenerMethods) { - val currentSubscribers = subscribers[eventType] + for ((topic, listenerMethodsForType) in listenerMethods) { + val currentSubscribers = subscribers[topic] require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType.toSet()))) { // if removeAll returns true, all we really know is that at least one subscriber was // removed... however, barring something very strange we can assume that if at least one - // subscriber was removed, all subscribers on subscriber for that message type were... after + // subscriber was removed, all subscribers on subscriber for that message type were... after // all, the definition of subscribers on a particular class is totally static "missing message subscriber for an annotated method. Is $subscriber registered?" } @@ -84,30 +109,55 @@ open class SubscriberRegistry(private val bus: IBus) { } } + /** Unregisters all subscribers by topic on the given subscriber object. */ + open fun unregister(topic: String, subscriber: Any) { + val listenerMethods = findAllSubscribers(subscriber) + if (listenerMethods[topic] == null) return //不包含topic则推出 + val currentSubscribers = subscribers[topic] + require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethods[topic]!!.toSet()))) { + // if removeAll returns true, all we really know is that at least one subscriber was + // removed... however, barring something very strange we can assume that if at least one + // subscriber was removed, all subscribers on subscriber for that message type were... after + // all, the definition of subscribers on a particular class is totally static + "missing message subscriber for an annotated method. Is $subscriber registered?" + } + + // don't try to remove the set if it's empty; that can't be done safely without a lock + // anyway, if the set is empty it'll just be wrapping an array of length 0 + } + + /** * Gets an iterator representing an immutable snapshot of all subscribers to the given message at * the time this method is called. */ - fun getSubscribers(eventType: Any): Iterator> { - val eventSubscribers: CopyOnWriteArraySet> = - subscribers[eventType.javaClass] ?: CopyOnWriteArraySet() - return eventSubscribers.iterator() + fun getSubscribers(topic: String): Iterator> { + val topicSubscribers: CopyOnWriteArraySet> = subscribers[topic] ?: CopyOnWriteArraySet() + return topicSubscribers.iterator() } /** - * Returns all subscribers for the given subscriber grouped by the type of message they subscribe to. + * Returns all subscribers for the given subscriber grouped by the type of message they subscribe to. */ - private fun findAllSubscribers(subscriber: Any): Map, List>> { - val methodsInListener = mutableMapOf, List>>() + private fun findAllSubscribers(subscriber: Any): Map>> { + val methodsInListener = mutableMapOf>>() val clazz: Class<*> = subscriber.javaClass for (method in getAnnotatedMethods(clazz)) { - val parameterTypes = method.parameterTypes - val eventType = parameterTypes[0] - val methods = methodsInListener[eventType]?.toMutableList() ?: mutableListOf() - methods.add(Subscriber.create(bus, subscriber, method)) - methodsInListener[eventType] = methods + var topics = method.getAnnotation(Subscribe::class.java).topics + + //如果没有定义topic,则使用消息类名称做topic + if (topics.isEmpty()) { + val parameterTypes = method.parameterTypes + val messageType = parameterTypes[0] + topics = arrayOf(messageType.name) + } + for (topic in topics) { + val listeners = methodsInListener.getOrDefault(topic, mutableListOf()) + listeners.add(Subscriber.create(bus, subscriber, method)) + methodsInListener[topic] = listeners + } } - return methodsInListener.toMap() + return methodsInListener } diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/AsyncMessageBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/AsyncMessageBus.kt deleted file mode 100644 index 30cb8b5..0000000 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/AsyncMessageBus.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (C) 2007 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package com.synebula.gaea.bus.messagebus - -import com.synebula.gaea.bus.Dispatcher -import com.synebula.gaea.bus.LoggingHandler -import com.synebula.gaea.bus.SubscriberExceptionHandler -import java.util.concurrent.Executor - -/** - * An [MessageBus] that takes the Executor of your choice and uses it to dispatch events, - * allowing dispatch to occur asynchronously. - * - * @author Cliff - * @since 10.0 - */ -class AsyncMessageBus : MessageBus { - /** - * Creates a new AsyncEventBus that will use `executor` to dispatch events. Assigns `identifier` as the bus's name for logging purposes. - * - * @param identifier short name for the bus, for logging purposes. - * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut - * down the executor after the last event has been posted to this event bus. - */ - constructor(identifier: String, executor: Executor) : super( - identifier, - executor, - Dispatcher.legacyAsync(), - LoggingHandler() - ) - - /** - * Creates a new AsyncEventBus that will use `executor` to dispatch events. - * - * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut - * down the executor after the last event has been posted to this event bus. - * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers. - * See [SubscriberExceptionHandler] for more information. - * @since 16.0 - */ - constructor(executor: Executor, subscriberExceptionHandler: SubscriberExceptionHandler) : super( - "default", - executor, - Dispatcher.legacyAsync(), - subscriberExceptionHandler - ) - - /** - * Creates a new AsyncEventBus that will use `executor` to dispatch events. - * - * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut - * down the executor after the last event has been posted to this event bus. - */ - constructor(executor: Executor) : super( - "default", - executor, - Dispatcher.legacyAsync(), - LoggingHandler() - ) -} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/IMessageBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/IMessageBus.kt deleted file mode 100644 index da731fb..0000000 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/IMessageBus.kt +++ /dev/null @@ -1,39 +0,0 @@ -package com.synebula.gaea.bus.messagebus - -import com.synebula.gaea.bus.IBus -import java.lang.reflect.Method - -interface IMessageBus : IBus { - - /** - * 注册事件Listener - * @param topics 主题 - * @param subscriber subscriber对象 - */ - fun register(topics: Array, subscriber: Any) - - /** - * 注册事件Listener - * @param topics 主题 - * @param subscriber subscriber对象 - * @param method Listener方法 - */ - fun register(topics: Array, subscriber: Any, method: Method) - - - /** - * 取消注册事件Listener - * @param topic 主题 - * @param subscriber subscriber对象 - */ - fun unregister(topic: String, subscriber: Any) - - - /** - * 同步发布事件 - * @param topic 主题 - * @param message 事件 - */ - fun publish(topic: String, message: T) - -} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/MessageBus.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/MessageBus.kt deleted file mode 100644 index db6a4aa..0000000 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/MessageBus.kt +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Copyright (C) 2007 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package com.synebula.gaea.bus.messagebus - -import com.synebula.gaea.bus.* -import java.lang.reflect.Method -import java.util.* -import java.util.concurrent.Executor -import java.util.logging.Level -import java.util.logging.Logger - -/** - * Dispatches events to listeners, and provides ways for listeners to register themselves. - * - *

Avoid EventBus

- * - * - * **We recommend against using EventBus.** It was designed many years ago, and newer - * libraries offer better ways to decouple components and react to events. - * - * - * To decouple components, we recommend a dependency-injection framework. For Android code, most - * apps use [Dagger](https://dagger.dev). For server code, common options include [Guice](https://github.com/google/guice/wiki/Motivation) and [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-introduction). - * Frameworks typically offer a way to register multiple listeners independently and then request - * them together as a set ([Dagger](https://dagger.dev/dev-guide/multibindings), [Guice](https://github.com/google/guice/wiki/Multibindings), [Spring](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-autowired-annotation)). - * - * - * To react to events, we recommend a reactive-streams framework like [RxJava](https://github.com/ReactiveX/RxJava/wiki) (supplemented with its [RxAndroid](https://github.com/ReactiveX/RxAndroid) extension if you are building for - * Android) or [Project Reactor](https://projectreactor.io/). (For the basics of - * translating code from using an event bus to using a reactive-streams framework, see these two - * guides: [1](https://blog.jkl.gg/implementing-an-event-bus-with-rxjava-rxbus/), [2](https://lorentzos.com/rxjava-as-event-bus-the-right-way-10a36bdd49ba).) Some usages - * of EventBus may be better written using [Kotlin coroutines](https://kotlinlang.org/docs/coroutines-guide.html), including [Flow](https://kotlinlang.org/docs/flow.html) and [Channels](https://kotlinlang.org/docs/channels.html). Yet other usages are better served - * by individual libraries that provide specialized support for particular use cases. - * - * - * Disadvantages of EventBus include: - * - * - * * It makes the cross-references between producer and subscriber harder to find. This can - * complicate debugging, lead to unintentional reentrant calls, and force apps to eagerly - * initialize all possible subscribers at startup time. - * * It uses reflection in ways that break when code is processed by optimizers/minimizer like - * [R8 and Proguard](https://developer.android.com/studio/build/shrink-code). - * * It doesn't offer a way to wait for multiple events before taking action. For example, it - * doesn't offer a way to wait for multiple producers to all report that they're "ready," nor - * does it offer a way to batch multiple events from a single producer together. - * * It doesn't support backpressure and other features needed for resilience. - * * It doesn't provide much control of threading. - * * It doesn't offer much monitoring. - * * It doesn't propagate exceptions, so apps don't have a way to react to them. - * * It doesn't interoperate well with RxJava, coroutines, and other more commonly used - * alternatives. - * * It imposes requirements on the lifecycle of its subscribers. For example, if an event - * occurs between when one subscriber is removed and the next subscriber is added, the event - * is dropped. - * * Its performance is suboptimal, especially under Android. - * * It [doesn't support parameterized - * types](https://github.com/google/guava/issues/1431). - * * With the introduction of lambdas in Java 8, EventBus went from less verbose than listeners - * to [more verbose](https://github.com/google/guava/issues/3311). - * - * - *

EventBus Summary

- * - * - * The EventBus allows publish-subscribe-style communication between components without requiring - * the components to explicitly register with one another (and thus be aware of each other). It is - * designed exclusively to replace traditional Java in-process event distribution using explicit - * registration. It is *not* a general-purpose publish-subscribe system, nor is it intended - * for interprocess communication. - * - *

Receiving Events

- * - * - * To receive events, an object should: - * - * - * 1. Expose a public method, known as the *event subscriber*, which accepts a single - * argument of the type of event desired; - * 1. Mark it with a [Subscribe] annotation; - * 1. Pass itself to an EventBus instance's [.register] method. - * - * - *

Posting Events

- * - * - * To post an event, simply provide the event object to the [.post] method. The - * EventBus instance will determine the type of event and route it to all registered listeners. - * - * - * Events are routed based on their type an event will be delivered to any subscriber for - * any type to which the event is *assignable.* This includes implemented interfaces, all - * superclasses, and all interfaces implemented by superclasses. - * - * - * When `post` is called, all registered subscribers for an event are run in sequence, so - * subscribers should be reasonably quick. If an event may trigger an extended process (such as a - * database load), spawn a thread or queue it for later. (For a convenient way to do this, use an - * [AsyncMessageBus].) - * - *

Subscriber Methods

- * - * - * Event subscriber methods must accept only one argument: the event. - * - * - * Subscribers should not, in general, throw. If they do, the EventBus will catch and log the - * exception. This is rarely the right solution for error handling and should not be relied upon; it - * is intended solely to help find problems during development. - * - * - * The EventBus guarantees that it will not call a subscriber method from multiple threads - * simultaneously, unless the method explicitly allows it by bearing the [ ] annotation. If this annotation is not present, subscriber methods need not - * worry about being reentrant, unless also called from outside the EventBus. - * - *

Dead Events

- * - * - * If an event is posted, but no registered subscribers can accept it, it is considered "dead." - * To give the system a second chance to handle dead events, they are wrapped in an instance of - * [DeadMessage] and reposted. - * - * - * If a subscriber for a supertype of all events (such as Object) is registered, no event will - * ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadMessage - * extends [Object], a subscriber registered to receive any Object will never receive a - * DeadMessage. - * - * - * This class is safe for concurrent use. - * - * - * See the Guava User Guide article on [`EventBus`](https://github.com/google/guava/wiki/EventBusExplained). - * - * @author Cliff - * @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java - * @param executor the default executor this event bus uses for dispatching events to subscribers. - * @param dispatcher message dispatcher. - * @param exceptionHandler Handler for subscriber exceptions. - */ -open class MessageBus internal constructor( - override val identifier: String, - override val executor: Executor, - val dispatcher: Dispatcher, - val exceptionHandler: SubscriberExceptionHandler, -) : IMessageBus { - val DEAD_TOPIC = "DEAD_TOPIC" - - val subscribers = TopicSubscriberRegistry(this) - - /** - * Creates a new EventBus with the given `identifier`. - * - * @param identifier a brief name for this bus, for logging purposes. Should/home/alex/privacy/project/myths/gaea be a valid Java - * identifier. - */ - @JvmOverloads - constructor(identifier: String = "default") : this( - identifier, - Executor { it.run() }, - Dispatcher.perThreadDispatchQueue(), - LoggingHandler() - ) - - /** - * Creates a new EventBus with the given [SubscriberExceptionHandler]. - * - * @param exceptionHandler Handler for subscriber exceptions. - * @since 16.0 - */ - constructor(exceptionHandler: SubscriberExceptionHandler) : this( - "default", - Executor { it.run() }, - Dispatcher.perThreadDispatchQueue(), - exceptionHandler - ) - - override fun register(topics: Array, subscriber: Any) { - subscribers.register(topics, subscriber) - } - - /** - * Registers subscriber method on `object` to receive messages. - * - * @param topics method subscribe topic. - * @param subscriber subscriber method declare object. - * @param method subscriber method should be registered. - */ - override fun register(topics: Array, subscriber: Any, method: Method) { - subscribers.register(topics, subscriber, method) - } - - - /** - * Registers all subscriber methods on `object` to receive messages. - * - * @param subscriber object whose subscriber methods should be registered. - */ - override fun register(subscriber: Any) { - subscribers.register(subscriber) - } - - override fun register(subscriber: Any, method: Method) { - subscribers.register(subscriber, method) - } - - /** - * Unregisters all subscriber methods on a registered `object`. - * - * @param subscriber object whose subscriber methods should be unregistered. - * @throws IllegalArgumentException if the object was not previously registered. - */ - override fun unregister(subscriber: Any) { - subscribers.unregister(subscriber) - } - - override fun unregister(topic: String, subscriber: Any) { - subscribers.unregister(topic, subscriber) - } - - /** - * Posts an message to all registered subscribers. This method will return successfully after the - * message has been posted to all subscribers, and regardless of any exceptions thrown by - * subscribers. - * - * @param message message to post. - */ - override fun publish(message: T) { - val messageSubscribers = subscribers.getSubscribers(message.javaClass.name) - if (messageSubscribers.hasNext()) { - dispatcher.dispatch(message, messageSubscribers) - } else { - // the message had no subscribers and was not itself a DeadMessage - publish(DEAD_TOPIC, message) - } - } - - override fun publish(topic: String, message: T) { - val messageSubscribers = subscribers.getSubscribers(topic) - if (messageSubscribers.hasNext()) { - dispatcher.dispatch(message, messageSubscribers) - } else if (topic != DEAD_TOPIC) { - // the message had no subscribers and was not itself a DeadMessage - publish(DEAD_TOPIC, message) - } - } - - - /** Handles the given exception thrown by a subscriber with the given context. */ - override fun handleException(cause: Throwable?, context: SubscriberExceptionContext) { - try { - exceptionHandler.handleException(cause, context) - } catch (e2: Throwable) { - // if the handler threw an exception... well, just log it - logger.log( - Level.SEVERE, String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, cause), - e2 - ) - } - } - - override fun toString(): String { - return "MessageBus(identifier='$identifier', executor=$executor, dispatcher=$dispatcher, exceptionHandler=$exceptionHandler, subscribers=$subscribers)" - } - - companion object { - private val logger = Logger.getLogger(Bus::class.java.name) - } -} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/TopicSubscriberRegistry.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/TopicSubscriberRegistry.kt deleted file mode 100644 index 6c8d0f7..0000000 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/bus/messagebus/TopicSubscriberRegistry.kt +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright (C) 2014 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package com.synebula.gaea.bus.messagebus - -import com.synebula.gaea.bus.Subscribe -import com.synebula.gaea.bus.Subscriber -import com.synebula.gaea.bus.SubscriberRegistry -import java.lang.reflect.Method -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.CopyOnWriteArraySet - -/** - * Registry of subscribers to a single message bus. - * - * @author Colin Decker - */ -open class TopicSubscriberRegistry(private val bus: IMessageBus) : SubscriberRegistry(bus) { - /** - * All registered subscribers, indexed by message type. - * - * - * The [CopyOnWriteArraySet] values make it easy and relatively lightweight to get an - * immutable snapshot of all current subscribers to an message without any locking. - */ - private val subscribers = ConcurrentHashMap>>() - - open fun register(topics: Array, subscriber: Any) { - val listenerMethods = findAllSubscribers(subscriber) - for (topic in topics) { - for ((_, messageMethodsInListener) in listenerMethods) { - var messageSubscribers = subscribers[topic] - if (messageSubscribers == null) { - val newSet = CopyOnWriteArraySet>() - messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet - } - messageSubscribers.addAll(messageMethodsInListener) - } - } - } - - open fun register(topics: Array, subscriber: Any, method: Method) { - for (topic in topics) { - var messageSubscribers = subscribers[topic] - if (messageSubscribers == null) { - val newSet = CopyOnWriteArraySet>() - messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet - } - messageSubscribers.add(Subscriber.create(this.bus, subscriber, method)) - } - } - - /** Registers all subscriber methods on the given subscriber object. */ - override fun register(subscriber: Any) { - val listenerMethods = findAllSubscribers(subscriber) - for ((topic, messageMethodsInListener) in listenerMethods) { - var messageSubscribers = subscribers[topic] - if (messageSubscribers == null) { - val newSet = CopyOnWriteArraySet>() - messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet - } - messageSubscribers.addAll(messageMethodsInListener) - } - } - - /** Registers subscriber method on the given subscriber object. */ - override fun register(subscriber: Any, method: Method) { - val parameterTypes = method.parameterTypes - check(parameterTypes.size == 1) { - "Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter." - } - check(!parameterTypes[0].isPrimitive) { - "@SubscribeTopic method $method's parameter is ${parameterTypes[0].name}. Subscriber methods cannot accept primitives. " - } - - val eventType = parameterTypes[0] - val topic = eventType.name - var messageSubscribers = subscribers[topic] - if (messageSubscribers == null) { - val newSet = CopyOnWriteArraySet>() - messageSubscribers = subscribers.putIfAbsent(topic, newSet) ?: newSet - } - messageSubscribers.add(Subscriber.create(bus, subscriber, method)) - } - - /** Unregisters all subscribers on the given subscriber object. */ - override fun unregister(subscriber: Any) { - val listenerMethods = findAllSubscribers(subscriber) - for ((topic, listenerMethodsForType) in listenerMethods) { - val currentSubscribers = subscribers[topic] - require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType.toSet()))) { - // if removeAll returns true, all we really know is that at least one subscriber was - // removed... however, barring something very strange we can assume that if at least one - // subscriber was removed, all subscribers on subscriber for that message type were... after - // all, the definition of subscribers on a particular class is totally static - "missing message subscriber for an annotated method. Is $subscriber registered?" - } - - // don't try to remove the set if it's empty; that can't be done safely without a lock - // anyway, if the set is empty it'll just be wrapping an array of length 0 - } - } - - /** Unregisters all subscribers by topic on the given subscriber object. */ - open fun unregister(topic: String, subscriber: Any) { - val listenerMethods = findAllSubscribers(subscriber) - if (listenerMethods[topic] == null) return //不包含topic则推出 - val currentSubscribers = subscribers[topic] - require(!(currentSubscribers == null || !currentSubscribers.removeAll(listenerMethods[topic]!!.toSet()))) { - // if removeAll returns true, all we really know is that at least one subscriber was - // removed... however, barring something very strange we can assume that if at least one - // subscriber was removed, all subscribers on subscriber for that message type were... after - // all, the definition of subscribers on a particular class is totally static - "missing message subscriber for an annotated method. Is $subscriber registered?" - } - - // don't try to remove the set if it's empty; that can't be done safely without a lock - // anyway, if the set is empty it'll just be wrapping an array of length 0 - } - - - /** - * Gets an iterator representing an immutable snapshot of all subscribers to the given message at - * the time this method is called. - */ - fun getSubscribers(topic: String): Iterator> { - val topicSubscribers: CopyOnWriteArraySet> = subscribers[topic] ?: CopyOnWriteArraySet() - return topicSubscribers.iterator() - } - - /** - * Returns all subscribers for the given subscriber grouped by the type of message they subscribe to. - */ - private fun findAllSubscribers(subscriber: Any): Map>> { - val methodsInListener = mutableMapOf>>() - val clazz: Class<*> = subscriber.javaClass - for (method in getAnnotatedMethods(clazz)) { - var topics = method.getAnnotation(Subscribe::class.java).topics - - //如果没有定义topic,则使用消息类名称做topic - if (topics.isEmpty()) { - val parameterTypes = method.parameterTypes - val messageType = parameterTypes[0] - topics = arrayOf(messageType.name) - } - for (topic in topics) { - val listeners = methodsInListener.getOrDefault(topic, mutableListOf()) - listeners.add(Subscriber.create(bus, subscriber, method)) - methodsInListener[topic] = listeners - } - } - return methodsInListener - } - - - class MethodIdentifier(method: Method) { - private val name: String = method.name - private val parameterTypes: List> = listOf(*method.parameterTypes) - - override fun hashCode(): Int { - var result = name.hashCode() - for (element in parameterTypes) result = 31 * result + element.hashCode() - return result - } - - override fun equals(other: Any?): Boolean { - if (other is MethodIdentifier) { - return name == other.name && parameterTypes == other.parameterTypes - } - return false - } - } - -} \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/domain/event/AfterRemoveEvent.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/domain/event/AfterRemoveEvent.kt new file mode 100644 index 0000000..6c3b076 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/domain/event/AfterRemoveEvent.kt @@ -0,0 +1,5 @@ +package com.synebula.gaea.domain.event + +import com.synebula.gaea.domain.model.IAggregateRoot + +class AfterRemoveEvent, I>(var id: I? = null) \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/domain/event/BeforeRemoveEvent.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/domain/event/BeforeRemoveEvent.kt new file mode 100644 index 0000000..21a9211 --- /dev/null +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/domain/event/BeforeRemoveEvent.kt @@ -0,0 +1,5 @@ +package com.synebula.gaea.domain.event + +import com.synebula.gaea.domain.model.IAggregateRoot + +class BeforeRemoveEvent, I>(var id: I? = null) \ No newline at end of file diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/domain/repository/IRepository.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/domain/repository/IRepository.kt index 99c411a..e8fa6cd 100644 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/domain/repository/IRepository.kt +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/domain/repository/IRepository.kt @@ -68,7 +68,7 @@ interface IRepository, ID> { * @param params 查询条件。 * @return int */ - fun count(params: Map?): Int + fun count(params: Map?): Int } diff --git a/src/gaea/src/main/kotlin/com/synebula/gaea/domain/service/Service.kt b/src/gaea/src/main/kotlin/com/synebula/gaea/domain/service/Service.kt index 1856b12..27b8021 100644 --- a/src/gaea/src/main/kotlin/com/synebula/gaea/domain/service/Service.kt +++ b/src/gaea/src/main/kotlin/com/synebula/gaea/domain/service/Service.kt @@ -1,9 +1,14 @@ package com.synebula.gaea.domain.service +import com.synebula.gaea.bus.IBus import com.synebula.gaea.data.message.DataMessage import com.synebula.gaea.data.serialization.IObjectMapper +import com.synebula.gaea.domain.event.AfterRemoveEvent +import com.synebula.gaea.domain.event.BeforeRemoveEvent import com.synebula.gaea.domain.model.IAggregateRoot import com.synebula.gaea.domain.repository.IRepository +import com.synebula.gaea.ext.firstCharLowerCase +import javax.annotation.Resource /** @@ -22,6 +27,8 @@ open class Service, ID>( protected open var repository: IRepository, protected open var mapper: IObjectMapper, ) : IService { + @Resource + protected open var bus: IBus? = null /** * 增加对象 @@ -73,7 +80,17 @@ open class Service, ID>( * @param id 对象ID */ override fun remove(id: ID) { + val beforeRemoveEvent = BeforeRemoveEvent(id) + this.bus?.publish( + "${this.clazz.simpleName.firstCharLowerCase()}${BeforeRemoveEvent::class.java.simpleName}", + beforeRemoveEvent + ) this.repository.remove(id) + val afterRemoveEvent = AfterRemoveEvent(id) + this.bus?.publish( + "${this.clazz.simpleName.firstCharLowerCase()}${AfterRemoveEvent::class.java.simpleName}", + afterRemoveEvent + ) } /** diff --git a/src/gaea/src/test/kotlin/com/synebula/gaea/bus/BusTest.kt b/src/gaea/src/test/kotlin/com/synebula/gaea/bus/BusTest.kt index 0022320..da0a145 100644 --- a/src/gaea/src/test/kotlin/com/synebula/gaea/bus/BusTest.kt +++ b/src/gaea/src/test/kotlin/com/synebula/gaea/bus/BusTest.kt @@ -1,8 +1,5 @@ package com.synebula.gaea.bus -import com.synebula.gaea.bus.messagebus.AsyncMessageBus -import com.synebula.gaea.bus.messagebus.IMessageBus -import com.synebula.gaea.bus.messagebus.MessageBus import org.junit.Test import java.util.concurrent.Executors @@ -21,36 +18,36 @@ class BusTest { @Test fun testAsyncBus() { - val bus: IBus = AsyncBus(Executors.newFixedThreadPool(10)) + val bus: IBus = Bus(Executors.newFixedThreadPool(10)) val subscriber = TestSubscriber() bus.register(subscriber, subscriber.javaClass.declaredMethods[0]) bus.register(subscriber, subscriber.javaClass.declaredMethods[1]) - bus.publish("Hello world") - bus.publish(subscriber) + bus.publishAsync("Hello world") + bus.publishAsync(subscriber) } @Test - fun testMessageBus() { - val bus: IMessageBus = MessageBus() + fun testTopicBus() { + val bus: IBus = Bus() val subscriber = TestTopicSubscriber() bus.register(subscriber) bus.publish("hello", "Hello world") - bus.publish("whoami", subscriber) + bus.publishAsync("whoami", subscriber) } @Test - fun testMessageBus2() { - val bus: IMessageBus = AsyncMessageBus(Executors.newFixedThreadPool(10)) + fun testAsyncTopicBus() { + val bus: IBus = Bus(Executors.newFixedThreadPool(10)) val subscriber = TestTopicSubscriber() val subscriber2 = TestTopicSubscriber2() bus.register(subscriber) bus.register(subscriber2) - bus.publish("hello", "Hello world") - bus.publish("whoami", subscriber) + bus.publishAsync("hello", "Hello world") + bus.publishAsync("whoami", subscriber) bus.unregister(subscriber2) - bus.publish("hello", "Hello world") - bus.publish("whoami", subscriber) + bus.publishAsync("hello", "Hello world") + bus.publishAsync("whoami", subscriber) } internal class TestSubscriber {