增加消息订阅注解 DomainSubscribe; 修改异常提示的获取方式

This commit is contained in:
2022-08-30 11:10:59 +08:00
parent 4a45f7d61f
commit ea5c4160f1
20 changed files with 214 additions and 86 deletions

View File

@@ -1,7 +1,9 @@
package com.synebula.gaea.app.component.bus
import com.synebula.gaea.bus.DomainSubscribe
import com.synebula.gaea.bus.IBus
import com.synebula.gaea.bus.Subscribe
import com.synebula.gaea.data.message.messageTopic
import org.springframework.beans.BeansException
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.config.BeanPostProcessor
@@ -27,20 +29,19 @@ class EventBusSubscriberProcessor : BeanPostProcessor {
// for each method in the bean
val methods: Array<Method> = bean.javaClass.methods
for (method in methods) {
// check the annotations on that method
val annotations: Array<Annotation> = method.annotations
for (annotation in annotations) {
// if it contains Subscribe annotation
if (annotation.annotationClass == Subscribe::class) {
// 如果这是一个Guava @Subscribe注解的事件监听器方法说明所在bean实例
// 对应一个Guava事件监听器类将该bean实例注册到Guava事件总线
val subscribe = annotation as Subscribe
if (subscribe.topics.isEmpty())
bus?.register(bean, method)
else
bus?.register(subscribe.topics, bean, method)
return bean
}
if (method.isAnnotationPresent(Subscribe::class.java)) {
val subscribe = method.getAnnotation(Subscribe::class.java)
if (subscribe.topics.isEmpty())
bus?.register(bean, method)
else
bus?.register(subscribe.topics, bean, method)
}
if (method.isAnnotationPresent(DomainSubscribe::class.java)) {
val domainSubscribe = method.getAnnotation(DomainSubscribe::class.java)
var topic = messageTopic(domainSubscribe.domain, domainSubscribe.messageClass.java)
if (domainSubscribe.domainClass != Nothing::class)
topic = messageTopic(domainSubscribe.domainClass.java, domainSubscribe.messageClass.java)
bus?.register(arrayOf(topic), bean, method)
}
}
return bean

View File

@@ -1,6 +1,5 @@
package com.synebula.gaea.mongodb.query
import com.synebula.gaea.ext.fieldNames
import com.synebula.gaea.ext.firstCharLowerCase
import com.synebula.gaea.mongodb.order
import com.synebula.gaea.mongodb.select
@@ -10,6 +9,7 @@ import com.synebula.gaea.query.IQuery
import com.synebula.gaea.query.Page
import com.synebula.gaea.query.Params
import com.synebula.gaea.query.Table
import com.synebula.gaea.reflect.fieldNames
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
@@ -70,15 +70,8 @@ open class MongodbQuery<TView, ID>(override var clazz: Class<TView>, var templat
return this.template.find(query, clazz, this.collection(clazz))
}
protected fun fields(clazz: Class<TView>): Array<String> {
val fields = mutableListOf<String>()
fields.addAll(clazz.fieldNames())
var parent = clazz.superclass
while (parent != Any::class.java) {
fields.addAll(clazz.superclass.fieldNames())
parent = parent.superclass
}
return fields.toTypedArray()
fun <TView> fields(clazz: Class<TView>): Array<String> {
return clazz.fieldNames().toTypedArray()
}
/**

View File

@@ -1,7 +1,6 @@
package com.synebula.gaea.mongodb.query
import com.synebula.gaea.ext.fieldNames
import com.synebula.gaea.ext.firstCharLowerCase
import com.synebula.gaea.mongodb.order
import com.synebula.gaea.mongodb.select
@@ -11,6 +10,7 @@ import com.synebula.gaea.query.IUniversalQuery
import com.synebula.gaea.query.Page
import com.synebula.gaea.query.Params
import com.synebula.gaea.query.Table
import com.synebula.gaea.reflect.fieldNames
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
@@ -70,14 +70,7 @@ open class MongodbUniversalQuery(var template: MongoTemplate) : IUniversalQuery
}
fun <TView> fields(clazz: Class<TView>): Array<String> {
val fields = mutableListOf<String>()
fields.addAll(clazz.fieldNames())
var parent = clazz.superclass
while (parent != Any::class.java) {
fields.addAll(clazz.superclass.fieldNames())
parent = parent.superclass
}
return fields.toTypedArray()
return clazz.fieldNames().toTypedArray()
}
/**

View File

@@ -60,7 +60,7 @@ abstract class AppAspect {
point.proceed()
} catch (ex: Throwable) {
val moduleName = this.resolveModuleName(point.`this`)
var message = "$moduleName - ${funcName}异常"
var message = "${moduleName}-${funcName}异常"
message = if (ex is NoticeUserException || ex is Error) {
"$message: ${ex.message}"
} else {
@@ -91,10 +91,12 @@ abstract class AppAspect {
moduleName = module.name
}
// 3.尝试找类的name字段作为模块名称
val nameField = clazz.fields.find { it.name == "name" }
if (nameField != null) {
moduleName = nameField.get(obj).toString()
try {
val nameGetter = clazz.getMethod("getName")
moduleName = nameGetter.invoke(obj).toString()
} catch (_: NoSuchMethodException) {
}
return moduleName
}
}

View File

@@ -0,0 +1,44 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.synebula.gaea.bus
import kotlin.reflect.KClass
/**
* Marks a method as a message subscriber.
*
*
* The type of message will be indicated by the method's first (and only) parameter, which cannot
* be primitive. If this annotation is applied to methods with zero parameters, or more than one
* parameter, the object containing the method will not be able to register for message delivery from
* the [Bus].
*
*
* Unless also annotated with @[AllowConcurrentSubscribe], message subscriber methods will be
* invoked serially by each message bus that they are registered with.
*
* @author Cliff
* @since 10.0
*
* @param messageClass class of message
* @param domainClass subscriber for class of domain
* @param domain subscriber for domain
*/
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER)
annotation class DomainSubscribe(
val messageClass: KClass<*>,
val domainClass: KClass<*> = Nothing::class,
val domain: String = ""
)

View File

@@ -29,7 +29,7 @@ package com.synebula.gaea.bus
* @author Cliff
* @since 10.0
*
* @param topics method subscribe topics, only for [com.synebula.gaea.bus.messagebus.MessageBus]
* @param topics method subscribe topics
*/
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER)

View File

@@ -13,6 +13,7 @@
*/
package com.synebula.gaea.bus
import com.synebula.gaea.data.message.messageTopic
import com.synebula.gaea.reflect.supertypes
import java.lang.reflect.Method
import java.util.concurrent.ConcurrentHashMap
@@ -143,7 +144,16 @@ open class SubscriberRegistry<T : Any>(private val bus: IBus<T>) {
val methodsInListener = mutableMapOf<String, MutableList<Subscriber<T>>>()
val clazz: Class<*> = subscriber.javaClass
for (method in getAnnotatedMethods(clazz)) {
var topics = method.getAnnotation(Subscribe::class.java).topics
var topics: Array<String>
if (method.isAnnotationPresent(Subscribe::class.java))
topics = method.getAnnotation(Subscribe::class.java).topics
else {
val domainSubscribe = method.getAnnotation(DomainSubscribe::class.java)
var topic = messageTopic(domainSubscribe.domain, domainSubscribe.messageClass.java)
if (domainSubscribe.domainClass != Nothing::class)
topic = messageTopic(domainSubscribe.domainClass.java, domainSubscribe.messageClass.java)
topics = arrayOf(topic)
}
//如果没有定义topic则使用消息类名称做topic
if (topics.isEmpty()) {
@@ -191,19 +201,20 @@ open class SubscriberRegistry<T : Any>(private val bus: IBus<T>) {
protected fun getAnnotatedMethods(clazz: Class<*>): List<Method> {
var methods = subscriberMethodsCache[clazz]
if (methods == null)
methods = getAnnotatedMethodsNotCached(clazz, Subscribe::class.java)
methods = getAnnotatedMethodsNotCached(clazz, Subscribe::class.java, DomainSubscribe::class.java)
return methods
}
protected fun getAnnotatedMethodsNotCached(
clazz: Class<*>,
annotationClass: Class<out Annotation>,
vararg annotationClasses: Class<out Annotation>,
): List<Method> {
val supertypes = flattenHierarchy(clazz)
val identifiers = mutableMapOf<MethodIdentifier, Method>()
for (supertype in supertypes) {
for (method in supertype.declaredMethods) {
if (method.isAnnotationPresent(annotationClass) && !method.isSynthetic) {
val isAnnotationPresent = annotationClasses.any { method.isAnnotationPresent(it) }
if (isAnnotationPresent && !method.isSynthetic) {
val parameterTypes = method.parameterTypes
check(parameterTypes.size == 1) {
"Method $method has @SubscribeTopic annotation but has ${parameterTypes.size} parameters. Subscriber methods must have exactly 1 parameter."

View File

@@ -1,7 +1,5 @@
package com.synebula.gaea.data.message
import java.util.*
/**
*
* 用来统一Http返回消息类型通常使用json格式传递
@@ -15,18 +13,14 @@ open class DataMessage<T>() : StatusMessage() {
*/
var data: T? = null
/**
* 消息时间戳
*/
val timestamp: Long = Date().time
constructor(data: T) : this() {
this.data = data
}
@Suppress("")
constructor(status: Int, message: String) : this() {
this.status = status
this.message = message
message.also { this.message = it }
}
constructor(status: Int, data: T, message: String) : this(status, message) {

View File

@@ -0,0 +1,22 @@
package com.synebula.gaea.data.message
interface IEvent {
/**
* 获取事件的Topic
*
* @param domain 领域名称
*/
fun topic(domain: String): String {
return messageTopic(domain, this::class.java)
}
/**
* 获取事件的Topic
*
* @param domainClass 领域类
*/
fun topic(domainClass: Class<*>): String {
return messageTopic(domainClass, this::class.java)
}
}

View File

@@ -5,7 +5,7 @@ package com.synebula.gaea.data.message
*/
interface IMessage {
/**
* 命令载荷, 实际的业务数据
* 消息载荷, 实际的业务数据
*/
var message: String
@@ -13,4 +13,22 @@ interface IMessage {
* 时间戳。
*/
var timestamp: Long
/**
* 获取消息的Topic
*
* @param domain 领域名称
*/
fun topic(domain: String): String {
return messageTopic(domain, this::class.java)
}
/**
* 获取消息的Topic
*
* @param domainClass 领域类
*/
fun topic(domainClass: Class<*>): String {
return messageTopic(domainClass, this::class.java)
}
}

View File

@@ -0,0 +1,23 @@
package com.synebula.gaea.data.message
import com.synebula.gaea.ext.firstCharLowerCase
/**
* 获取事件的Topic
*
* @param domain 领域名称
* @param messageClass 消息的类型
*/
fun messageTopic(domain: String, messageClass: Class<*>): String {
return if (domain.isNotBlank()) "$domain.${messageClass.simpleName}" else messageClass.simpleName
}
/**
* 获取事件的Topic
*
* @param domainClass 领域类
* @param messageClass 消息的类型
*/
fun messageTopic(domainClass: Class<*>, messageClass: Class<*>): String {
return "${domainClass.simpleName.firstCharLowerCase()}.${messageClass.simpleName}"
}

View File

@@ -1,6 +1,8 @@
package com.synebula.gaea.data.message
open class StatusMessage {
import java.util.*
open class StatusMessage : IMessage {
/**
* 状态。200成功400错误500异常
*/
@@ -14,6 +16,10 @@ open class StatusMessage {
/**
* 附带提示消息
*/
var message = ""
override var message = ""
/**
* 时间戳。
*/
override var timestamp = Date().time
}

View File

@@ -1,5 +1,6 @@
package com.synebula.gaea.domain.event
import com.synebula.gaea.data.message.IEvent
import com.synebula.gaea.domain.model.IAggregateRoot
class AfterRemoveEvent<T : IAggregateRoot<I>, I>(var id: I? = null)
class AfterRemoveEvent<T : IAggregateRoot<I>, I>(var id: I? = null) : IEvent

View File

@@ -1,5 +1,6 @@
package com.synebula.gaea.domain.event
import com.synebula.gaea.data.message.IEvent
import com.synebula.gaea.domain.model.IAggregateRoot
class BeforeRemoveEvent<T : IAggregateRoot<I>, I>(var id: I? = null)
class BeforeRemoveEvent<T : IAggregateRoot<I>, I>(var id: I? = null) : IEvent

View File

@@ -7,7 +7,6 @@ import com.synebula.gaea.domain.event.AfterRemoveEvent
import com.synebula.gaea.domain.event.BeforeRemoveEvent
import com.synebula.gaea.domain.model.IAggregateRoot
import com.synebula.gaea.domain.repository.IRepository
import com.synebula.gaea.ext.firstCharLowerCase
import javax.annotation.Resource
@@ -81,16 +80,10 @@ open class Service<TRoot : IAggregateRoot<ID>, ID>(
*/
override fun remove(id: ID) {
val beforeRemoveEvent = BeforeRemoveEvent<TRoot, ID>(id)
this.bus?.publish(
"${this.clazz.simpleName.firstCharLowerCase()}${BeforeRemoveEvent::class.java.simpleName}",
beforeRemoveEvent
)
this.bus?.publish(beforeRemoveEvent.topic(this.clazz), beforeRemoveEvent)
this.repository.remove(id)
val afterRemoveEvent = AfterRemoveEvent<TRoot, ID>(id)
this.bus?.publish(
"${this.clazz.simpleName.firstCharLowerCase()}${AfterRemoveEvent::class.java.simpleName}",
afterRemoveEvent
)
this.bus?.publish(afterRemoveEvent.topic(this.clazz), afterRemoveEvent)
}
/**

View File

@@ -1,9 +1,13 @@
package com.synebula.gaea.domain.service
import com.synebula.gaea.bus.IBus
import com.synebula.gaea.data.message.DataMessage
import com.synebula.gaea.domain.event.AfterRemoveEvent
import com.synebula.gaea.domain.event.BeforeRemoveEvent
import com.synebula.gaea.domain.model.IAggregateRoot
import com.synebula.gaea.domain.repository.IRepository
import com.synebula.gaea.log.ILogger
import javax.annotation.Resource
/**
@@ -17,26 +21,32 @@ import com.synebula.gaea.log.ILogger
* @version 0.1
* @since 2020-05-17
*/
open class SimpleService<TAggregateRoot : IAggregateRoot<ID>, ID>(
protected open var clazz: Class<TAggregateRoot>,
protected open var repository: IRepository<TAggregateRoot, ID>,
open class SimpleService<TRoot : IAggregateRoot<ID>, ID>(
protected open var clazz: Class<TRoot>,
protected open var repository: IRepository<TRoot, ID>,
override var logger: ILogger,
) : ISimpleService<TAggregateRoot, ID> {
) : ISimpleService<TRoot, ID> {
@Resource
protected open var bus: IBus<Any>? = null
override fun add(root: TAggregateRoot): DataMessage<ID> {
override fun add(root: TRoot): DataMessage<ID> {
val msg = DataMessage<ID>()
this.repository.add(root)
msg.data = root.id
return msg
}
override fun update(id: ID, root: TAggregateRoot) {
override fun update(id: ID, root: TRoot) {
root.id = id
this.repository.update(root)
}
override fun remove(id: ID) {
val beforeRemoveEvent = BeforeRemoveEvent<TRoot, ID>(id)
this.bus?.publish(beforeRemoveEvent.topic(this.clazz), beforeRemoveEvent)
this.repository.remove(id)
val afterRemoveEvent = AfterRemoveEvent<TRoot, ID>(id)
this.bus?.publish(afterRemoveEvent.topic(this.clazz), afterRemoveEvent)
}
/**
@@ -44,7 +54,7 @@ open class SimpleService<TAggregateRoot : IAggregateRoot<ID>, ID>(
*
* @param roots 增加对象命令列表
*/
override fun add(roots: List<TAggregateRoot>) {
override fun add(roots: List<TRoot>) {
this.repository.add(roots)
}
@@ -53,7 +63,7 @@ open class SimpleService<TAggregateRoot : IAggregateRoot<ID>, ID>(
*
* @param roots 更新对象命令列表
*/
override fun update(roots: List<TAggregateRoot>) {
override fun update(roots: List<TRoot>) {
this.repository.update(roots)
}
}

View File

@@ -1,12 +0,0 @@
package com.synebula.gaea.ext
/**
* 获取对象字段信息字符串列表。
*/
fun Class<*>.fieldNames(): List<String> {
val names = mutableListOf<String>()
this.declaredFields.forEach { field ->
names.add(field.name)
}
return names
}

View File

@@ -12,7 +12,18 @@ fun String.firstCharLowerCase(): String {
this
else
StringBuilder().append(Character.toLowerCase(this.elementAt(0)))
.append(this.substring(1)).toString()
.append(this.substring(1)).toString()
}
/**
* 首字母小写
*/
fun String.firstCharUpperCase(): String {
return if (Character.isLowerCase(this.elementAt(0)))
this
else
StringBuilder().append(Character.toUpperCase(this.elementAt(0)))
.append(this.substring(1)).toString()
}
/**

View File

@@ -49,3 +49,20 @@ fun Class<*>.findField(name: String): Field? {
}
return field
}
/**
* 获取对象字段信息字符串列表。
*/
fun Class<*>.fieldNames(): List<String> {
val names = mutableListOf<String>()
for (f in this.declaredFields) {
names.add(f.name)
}
val superclass = this.superclass
if (superclass != Any::class.java) {
names.addAll(superclass.fieldNames())
}
return names
}