高并发脏数据容器设计

前言

考虑以下需求:

  1. 实现一个可以缓存数据的容器,采取K-V方式存取键值,容器可以接受的值要求支持Int,Double,String,List,Map和Set
  2. 要求容器能够在数据发生变动后记录变动数据的键值
  3. 容器可以在高并发场景下表现良好

应用例子:作为应用层和持久化层之间的Write Back型缓存,定期冲刷容器中发生变化的数据到持久化层进行保存

环境:Kotlin 1.9.24(JVM)

实现

首先考虑1,容易知,容器内部维护一个Map<String, Any>可以满足要求,再考虑3知采用JUC下的并发集合ConcurrentHashMap可以保证内部Map的数据增删改查是线程安全的

再考虑1和2,容器可以接受的数据类型是多样的,有基本类型(Int,Double,String)也有集合类型(List,Map,Set),并且还要求数据改动后能够被记录,所以先考虑抽象出一个整体的数据类接口Data

kotlin 复制代码
interface Data {
    /**
     * 数据类型
     */
    val type: DataType

    /**
     * 获取原始类型
     */
    fun get(): Any

    /**
     * 是否是脏数据
     */
    fun isDirty(): Boolean

    /**
     * 取消脏标记
     */
    fun markClean(): Boolean
}

enum class DataType {
    INT,
    DOUBLE,
    STRING,
    LIST,
    MAP,
    SET;
}

之后分别设计基本类型和集合类型的实现,首先考虑基本类型
使用泛型参数,后面设计统一的createData()方法执行实例化时传入Int,Double还是String

kotlin 复制代码
class PrimitiveData<T: Any>(
    override val type: DataType,
    private var value: T
) : Data {
    private var _isDirty = AtomicBoolean(false)

    fun set(value: T) {
        if (this.value != value) {
            this.value = value
            _isDirty.set(true)
        }
    }

    override fun get(): T {
        return value
    }

    override fun isDirty(): Boolean {
        return _isDirty.get()
    }

    override fun markClean(): Boolean {
        return _isDirty.set(false)
    }
}

考虑到要保证_isDirty的线程安全,所以采用AtomicBoolean确保getter和setter的原子性,在使用set(value)修改数据后,若数据有变动,则将脏标记设置为true

之后考虑集合类型

kotlin 复制代码
/**
 * List类型
 */
class ListData<E>(list: List<E>) : Data {
    private val _list: CopyOnWriteArrayList<E> = CopyOnWriteArrayList(list)
    private var _isDirty = AtomicBoolean(false)
    override val type = DataType.LIST

    fun modify(operation: Predicate<MutableList<E>>) {
        if (operation.test(_list)) _isDirty.set(true)
    }

    override fun get(): List<E> {
        return _list
    }

    override fun isDirty(): Boolean {
        return _isDirty.get()
    }

    override fun tryMarkClean(): Boolean {
        return _isDirty.set(false)
    }
}

/**
 * Map类型
 */
class MapData<K, V>(map: Map<K, V>) : Data {
    private val _map = ConcurrentHashMap(map)
    private var _isDirty = AtomicBoolean(false)
    override val type = DataType.MAP

    fun modify(operation: Predicate<MutableMap<K, V>>) {
        if (operation.test(_map)) _isDirty.set(true)
    }

    override fun get(): Map<K, V> {
        return _map
    }

    override fun isDirty(): Boolean {
        return _isDirty.get()
    }

    override fun tryMarkClean(): Boolean {
        return _isDirty.set(false)
    }
}

/**
 * Set类型
 */
class SetData<E>(set: Set<E>) : Data {
    private val _set: CopyOnWriteArraySet<E> = CopyOnWriteArraySet(set)
    private var _isDirty = AtomicBoolean(false)
    override val type = DataType.SET

    fun modify(operation: Predicate<MutableSet<E>>) {
        if (operation.test(_set)) _isDirty.set(true)
    }

    override fun get(): Set<E> {
        return _set
    }

    override fun isDirty(): Boolean {
        return _isDirty.get()
    }

    override fun markClean(): Boolean {
        return _isDirty.set(false)
    }
}

考虑在获取集合时提供不可变的集合接口,在修改集合时提供代理的修改方法modify(operation),在修改成功后设置脏标记为true,同时使用JUC下的线程安全集合CopyOnWriteArrayList/SetConcurrentHashMap保证内部集合的增删改查是线程安全的

之后考虑实现数据容器

kotlin 复制代码
class DirtyDataContainer {
    /** 维护一个ConcurrentHashMap **/
    private val dataMap = ConcurrentHashMap<String, Data>()

    /**
     * 加入数据
     * @return 是否成功
     */
    fun put(key: String, value: Any): Boolean {
        val data = createData(value)
        dataMap[key] = data
	//总是成功
	return true
    }

    /**
     * 移除数据
     * @return 是否成功(不存在时返回true)
     */
    fun remove(key: String): Boolean {
	if(!dataMap.containsKey(key)) return true
	return dataMap.remove(key)
    }

    /**
     * 获取数据
     */
    @Suppress("UNCHECKED_CAST")
    fun <T : Data> get(key: String): T {
        val data = dataMap[key] ?: throw NoSuchElementException("未找到: $key")
        return data as T
    }

    /**
     * 冲刷变动的数据
     * @return (键 -> 原始值)
     */
    fun flush(): Map<String, Any> {
        val result = mutableMapOf<String, Any>()
        dataMap.forEach { (key, data) ->
            if (data.isDirty()) {
                result[key] = data.getValue()
		data.markClean()
            }
        return result
    }

    private fun createData(value: Any): Data {
        return when (value) {
            is Int -> PrimitiveData(DataType.INT, value)
            is Double -> PrimitiveData(DataType.DOUBLE, value)
            is String -> PrimitiveData(DataType.STRING, value)
            is List<*> -> ListData(value)
            is Map<*, *> -> MapData(value)
            is Set<*> -> SetData(value)
            else -> throw IllegalArgumentException("不支持的类型: ${value.javaClass}")
        }
    }

以上设计是有问题的
内部Map容器的增删改查是线程安全的,但是外部容器存放数据的操作不是线程安全的,尤其是flush()方法,isDirty()markClean()函数的调用不是原子的,而且在遍历dataMap的过程中,数据可能会被另一个线程修改

所以,考虑对容器的冲刷操作上锁以保证线程安全,考虑到容器每次冲刷过程中脏数据总是占小部分,绝大部分数据是不需要重设脏标记的,也就是说在未有新数据存入和老数据取出的情况下这个脏数据容器总体是读多写少的,因此使用简单的Synchronized块/Reentrantlock阻塞其他线程的所有操作无疑会浪费性能,而采用ReadWriteLock这种悲观读锁在高并发环境下依然会造成锁频繁被获取和释放,考虑到在读线程进行操作的过程中并不一定有写入操作发生,这样做的性能开销依然是比较大的

所以,最佳的方式应该是使用StampedLock,先通过乐观读锁(无锁操作)读取数据,之后判断读的过程中是否发生了写的操作,如果没有继续执行任务,如果有的话,再切换为悲观读锁,阻塞其他写入线程的操作进行读取

同时我们给容器再加入一个isChanged字段获取容器整体的所有数据中是否有发生变动的数据

kotlin 复制代码
class DirtyDataContainer {
    /** 维护一个ConcurrentHashMap **/
    private val dataMap = ConcurrentHashMap<String, Data>()
    private val lock = StampedLock()

    /**
     * 数据是否发生变动
     */
    val isChanged: Boolean
        get() {
            // 先尝试乐观读
            val stamp = lock.tryOptimisticRead()
            val result = dataMap.any { it.value.isDirty() }
            // 验证读取的数据是否有效
            return if (lock.validate(stamp)) {
                result
            } else {
                // 如果乐观读失败,回退到悲观读
                val readStamp = lock.readLock()
                try {
                    dataMap.any { it.value.isDirty() }
                } finally {
                    lock.unlockRead(readStamp)
                }
            }
        }

    /**
     * 加入数据
     * @return 是否成功
     */
    fun put(key: String, value: Any): Boolean {
        val data = createData(value)
        dataMap[key] = data
	//总是成功
	return true
    }

    /**
     * 移除数据
     * @return 是否成功(不存在时返回true)
     */
    fun remove(key: String): Boolean {
	if(!dataMap.containsKey(key)) return true
	return dataMap.remove(key)
    }

    /**
     * 获取数据
     */
    @Suppress("UNCHECKED_CAST")
    fun <T : Data> get(key: String): T {
        val data = dataMap[key] ?: throw NoSuchElementException("未找到: $key")
        return data as T
    }

    /**
     * 冲刷变动的数据
     * @return (键 -> 原始值)
     */
    fun flush(): Map<String, Any> {
        //先尝试乐观读
        var stamp = lock.tryOptimisticRead()
        //首先一次性获取所有脏数据键的副本
        var dirtyDataSnapshotKeys = getDirtySnapshotKeys()
        //验证在读取时是否有写操作
        if (!lock.validate(stamp)) {
            //升级为悲观读
            stamp = lock.readLock()
            try {
                dirtyDataSnapshotKeys = getDirtySnapshotKeys()
            } finally {
                lock.unlockRead(stamp)
            }
        }

        val result = mutableMapOf<String, Any>()
        //批量写锁
        val writeStamp = lock.writeLock()
        try {
            dirtyDataSnapshotKeys.forEach { key ->
                val data = dataMap[key] ?: return@forEach
                //再次原子检查数据的脏标记
                if (data.isDirty() && data.tryMarkClean()) {
                    result[key] = data.get()
                }
            }
        } finally {
            lock.unlockWrite(writeStamp)
        }

        return result
    }

    private fun getDirtySnapshotKeys(): List<String> {
        val dirtyDataSnapshotKeys = mutableListOf<String>()
        dataMap.forEach { (key, data) ->
            if (data.isDirty()) {
                dirtyDataSnapshotKeys.add(key)
            }
        }

        return dirtyDataSnapshotKeys
    }

    private fun createData(value: Any): Data {
        return when (value) {
            is Int -> PrimitiveData(DataType.INT, value)
            is Double -> PrimitiveData(DataType.DOUBLE, value)
            is String -> PrimitiveData(DataType.STRING, value)
            is List<*> -> ListData(value)
            is Map<*, *> -> MapData(value)
            is Set<*> -> SetData(value)
            else -> throw IllegalArgumentException("不支持的类型: ${value.javaClass}")
        }
    }
}

flush()方法中内部dataMap的遍历做优化,考虑到遍历过程中数据可能被改变,所以先上StampedLock的锁获取当前脏数据的键,之后批量上写锁阻塞所有的写线程对之前是脏数据的键进行统一的判断和冲洗,与此同时,将flush()isDirty()markClean()的非原子操作装配成统一的tryMarkClean()原子操作

kotlin 复制代码
interface Data {
    /**
     * 数据类型
     */
    val type: DataType

    /**
     * 获取原始类型
     */
    fun get(): Any

    /**
     * 是否是脏数据
     */
    fun isDirty(): Boolean

    /**
     * 尝试取消脏标记
     */
    fun tryMarkClean(): Boolean
}

/**
 * 原始类型(Int, Double, String)
 */
class PrimitiveData<T: Any>(
    override val type: DataType,
    private var value: T
) : Data {
    private var _isDirty = AtomicBoolean(false)

    fun set(value: T) {
        if (this.value != value) {
            this.value = value
            _isDirty.compareAndSet(false, true)
        }
    }

    override fun get(): T {
        return value
    }

    override fun isDirty(): Boolean {
        return _isDirty.get()
    }

    override fun tryMarkClean(): Boolean {
        return _isDirty.compareAndSet(true, false)
    }
}

/**
 * List类型
 */
class ListData<E>(list: List<E>) : Data {
    private val _list: CopyOnWriteArrayList<E> = CopyOnWriteArrayList(list)
    private var _isDirty = AtomicBoolean(false)
    override val type = DataType.LIST

    fun modify(operation: Predicate<MutableList<E>>) {
        if (operation.test(_list)) _isDirty.compareAndSet(false, true)
    }

    override fun get(): List<E> {
        return _list
    }

    override fun isDirty(): Boolean {
        return _isDirty.get()
    }

    override fun tryMarkClean(): Boolean {
        return _isDirty.compareAndSet(true, false)
    }
}

/**
 * Map类型
 */
class MapData<K, V>(map: Map<K, V>) : Data {
    private val _map = ConcurrentHashMap(map)
    private var _isDirty = AtomicBoolean(false)
    override val type = DataType.MAP

    fun modify(operation: Predicate<MutableMap<K, V>>) {
        if (operation.test(_map)) _isDirty.compareAndSet(false, true)
    }

    override fun get(): Map<K, V> {
        return _map
    }

    override fun isDirty(): Boolean {
        return _isDirty.get()
    }

    override fun tryMarkClean(): Boolean {
        return _isDirty.compareAndSet(true, false)
    }
}

/**
 * Set类型
 */
class SetData<E>(set: Set<E>) : Data {
    private val _set: CopyOnWriteArraySet<E> = CopyOnWriteArraySet(set)
    private var _isDirty = AtomicBoolean(false)
    override val type = DataType.SET

    fun modify(operation: Predicate<MutableSet<E>>) {
        if (operation.test(_set)) _isDirty.compareAndSet(false, true)
    }

    override fun get(): Set<E> {
        return _set
    }

    override fun isDirty(): Boolean {
        return _isDirty.get()
    }

    override fun tryMarkClean(): Boolean {
        return _isDirty.compareAndSet(true, false)
    }
}

最后,考虑到一些情况下要求数据存放/取出容器后另一些地方能够立刻得知变化,并且可能有某些情况下需要决定是否允许数据能够存放/取出容器,设计一个事件工厂类

kotlin 复制代码
interface DataContainerEventFactory {

    fun callPreSet(container: DirtyDataContainer, key: String, value: Data): Boolean

    fun callPostSet(container: DirtyDataContainer, key: String, value: Data)

    fun callPreRemove(container: DirtyDataContainer, key: String): Boolean

    fun callPostRemove(container: DirtyDataContainer, key: String)

    companion object {
        val EMPTY = object : DataContainerEventFactory {

            override fun callPreSet(container: DirtyDataContainer, key: String, value: Data): Boolean {
                return true
            }

            override fun callPostSet(container: DirtyDataContainer, key: String, value: Data) {
            }

            override fun callPreRemove(container: DirtyDataContainer, key: String): Boolean {
                return true
            }

            override fun callPostRemove(container: DirtyDataContainer, key: String) {
            }
        }
}

修改数据容器,在数据存入/取出时Call对应的Pre/Post事件以决定数据是否可以存入/取出和通知已经存入/取出

kotlin 复制代码
class DirtyDataContainer(private var eventFactory: DataContainerEventFactory) {
    ...
    /**
     * 加入数据
     * @return 是否成功
     */
    fun put(key: String, value: Any): Boolean {
        val data = createData(value)
        val stamp = lock.writeLock()
        try {
            if (eventFactory.callPreSet(this, key, data)) {
                dataMap[key] = data
                eventFactory.callPostSet(this, key, data)
                return true
            }

            return false
        } finally {
            lock.unlockWrite(stamp)
        }
    }

    /**
     * 移除数据
     * @return 是否成功(不存在时返回true)
     */
    fun remove(key: String): Boolean {
        val stamp = lock.writeLock()
        try {
            if (!dataMap.containsKey(key)) return true
            if (eventFactory.callPreRemove(this, key)) {
                dataMap.remove(key)
                eventFactory.callPostRemove(this, key)
                return true
            }
            return false
        } finally {
            lock.unlockWrite(stamp)
        }
    }
    ...
}

之后用户只需要实现DataContainerEventFactory接口,实现自己的事件发布逻辑后用其初始化脏数据容器,便可订阅自己发布的事件以获取数据存入/取出容器的成功状态

更进一步

以上的脏数据容器勉强地实现了脏数据的维护和处理,但若作为作为L1缓存依然缺少过期策略等等特征

为此,可以考虑将容器内部维护的dataMap换成Guava Cache/Caffeine/Ehcache等高并发缓存框架的缓存来获取带有缓存策略的脏数据容器

游客

全部评论 (0)

暂无评论,快来抢沙发吧~