如何实现多个数据库分片的list_objects操作

业务场景

我们需要实现一个类似于Linux中ls命令的功能,用户可以用该功能来查看的bucket里边object列表,这些object信息存储在mysql meta表和shard meta表中;正常情况下一个bucket中的所有object存储在一张meta表中;然而当一个bucket中的object数量十分庞大时,我们采用了水平分表的方式,将这些object通过哈希方式分散到了1024个shard meta表中,以避免单表行数过大带来的性能问题。

因此,ls功能的实现需要考虑以上两种情景,分别是单表和多表的list;list功能需要支持delimiter来折叠文件夹(类似于linux中ls命令,根据该参数来决定是否展开当前目录下的子目录),marker来指定每次查询的起始位置,maxKeys来指定每次返回数目,prefix来筛选出以前缀开头的object。单表list我们可以直接采用mysql order来保证结果有序;如果object分散在了1024个shard meta表中,要每次拿出前n个就比较困难了,因为数据只是单表有序的,要想全局有序,就还得做一些处理。

分表hash方式

如何把很多object通过哈希方式分散到了1024个shard meta表中?如下代码所示,我们根据bucket和object构建一个url字符串,然后求MD5值,然后按4位做异或操作,最后对shard数目取模,就可以把object都随机分散到1024个表里了;MD5算法对原信息进行数学变换后得到的一个128位(bit)的特征码作为数据摘要,具有高度的离散性,原信息的一点点变化就会导致MD5的巨大变化。

void GenUrl(const std::string& bucket, const std::string& object, std::string& url) {
    url.append("bs://");
    url.append(bucket);
    url.append("/");
    url.append(object);
}

int32_t GetShardKey(const std::string &bucket, const std::string &object) {
    std::string url = "";
    GenUrl(bucket, object, url);
    std::string md5sum = MD5(url);
    const uint32_t *p = reinterpret_cast<const uint32_t*>(md5sum.data());
    return static_cast<int32_t>((p[0]^p[1]^p[2]^p[3]) % BUCKET_SHARD_NUM);
}

单表list

  • 解决方案

    • 每次查询都是根据请求参数组装成一条SQL,查出需要的object select object, etag, size,…… from meta where bucket_id = * and shard_key = * order by object limit 0,1001
    • 针对子文件夹的折叠处理:
      • ls操作需要支持折叠文件夹操作,以免一个文件夹下边有很多子文件夹,而且子文件夹里有很多文件的时候,会导致多次ls也一直在一个子文件夹里边,支持折叠子文件夹,用户才可以ls出文件夹下边的所有子文件夹
      • 解决方案:当遇到子文件夹中文件多的时候,每次都能list出指定数目的objects,这些objects可能都是一个子文件中的文件;因为用户需要折叠子文件夹,因此我们进行跳过子文件夹的处理,极端情况下这批objects都在一个子文件夹中,此时我们需要根据最后一个object name来判断是否在子文件中,如果该批objects在子文件中,此时我们会对下次查询的起始位置做一个++操作,确保下次查询跳过这个已经获取了的子文件夹;如此进行三次重试,至少保证一次请求可以拿出3个子文件夹给用户
    • marker的处理:如果需要跳过文件夹,对marker++;例如正常情况下,object > marker;如果要跳过文件夹,变成object >= marker++
  • 核心代码

    // 需要跳过文件夹时对marker的处理
    string BucketModel::ProcessMarkerWithDelimiter(const string& marker,
                        const string& prefix, char delimiter) {
        //prefix empty, or prefix not empty and marker start with prefix
        if (prefix.empty() || marker.compare(0, prefix.length(), prefix) == 0) {
            size_t pos = marker.find_first_of(delimiter, prefix.length());
            string new_marker;
            if (std::string::npos != pos) {
                ++pos;
                new_marker = marker.substr(0, pos);
                new_marker[new_marker.length()-1]++;
                return new_marker;
            } else {
                return marker;
            }
        }
        return marker;
    }
    

多表list

  • 解决方案

    • 利用Golang携程并发发送1024个shardMeta的list请求,也就是1024次SQL查询,拿到1024个请求结果之后,对结果做归并和提取处理,最终得出与单表listObject接口逻辑一致的结果。
  • 核心代码

    // merge two ObjectInfo struct type slice, max length is maxKeys
    func merge(maxKeys int, left, right []ObjectInfo) []ObjectInfo {
        var result []ObjectInfo
    
        for len(left) > 0 || len(right) > 0 {
            if len(left) == 0 {
                result = append(result, right...)
                break
            }
            if len(right) == 0 {
                result = append(result, left...)
                break
            }
            if left[0].Key <= right[0].Key {
                result = append(result, left[0])
                left = left[1:]
            } else {
                result = append(result, right[0])
                right = right[1:]
            }
            if len(result) >= maxKeys {
                return result[0:maxKeys]
            }
        }
        if len(result) >= maxKeys {
            return result[0:maxKeys]
        }
        return result
    }
    
  • 方案重难点

    • Cache嗅探机制
      List ShardMeta每次会通过VIP从Bucket取回1024个表数据,为了提高请求响应速度,当请求频率达到阈值时,这1024个表的数据会分别以ApiType+Bucket+ShardKey为key保存到Cache中,如何在充分利用Cache降低请求响应速度和对数据库压力的同时,也尽量减少与Cache之间的通信显得尤为重要。

      在充分考虑了系统Cache的实现机制之后,设计了预读取方案来降低与Cache之间的通信次数,将1024个ShardMeta的请求任务队列切割出一小部分(10个)作为嗅探Cache的任务,多个goroutine并发去读Cache,如果嗅探Cache的ShardMeta都能读取到,说明Cache中能读到1024个表数据,可以继续读Cache,否则后续请求都直接请求数据库,不再读Cache,这样可以把每次List Shard请求读Cache的次数从1024次降低到10次

    • 性能优化
      List ShardMeta是一个重请求,相当于所有操作开销都放大1024倍,然而又必须满足客户对耗时的要求,我主要从以下几个方面做了性能优化

      减少互斥锁粒度,充分利用Golang atomic函数:在归并1024个表数据时,只给归并过程和写最终结果的少量代码加锁,利用atomic变量做全局信号量,保证变量操作的原子性

      并发Goroutine发送请求和进行Json-Struct转换:利用Golang Goroutine给Bucket并发发请求,加快网络请求速度;归并结果时需要多次进行byte数组和Struct之间的转换,而Golang自带json库函数效率低,也将该过程利用Goroutine来并发完成,加快转换速度

      通过VIP分发请求:1024个请求如果并发到本机Bucket服务,会对本机服务造成巨大压力,而且由于单进程资源有限,会导致响应速度很慢。因此采用了将1024个请求发送给VIP的方式,均衡地分散到服务集群数千台机器上,保证了处理速度,降低了单机压力

    • 异常处理机制
      List ShardMeta每次1024个数据库请求,如果有一个请求遇到网络超时等错误,返回结果不正常,就会导致整个请求返回结果有误;因此我采用了标志量和超时机制来实现异常处理,一旦单个请求出现异常,标志量置位TRUE,其他请求不再继续;同时采用Goroutine等待超时信号量的方式来处理超时情况,如果工作任务出现错误,一旦从任务通道取不到任务超过50ms,Goroutine自动结束操作,给用户返回错误提示

json转换函数性能低的解决方案

Golang自带json Unmarshal函数转化包含1000个obj的list object接口返回结果时,耗时10ms,效率很低;主要有两种解决方案

  • 一种是减少转化次数以及缩小锁的范围,并发地去做转化
  • 另一种是使用开源库,性能能够提升2-4倍,例如easyjson,ffjson

并发等待是等goroutine还是Channel

Goroutine一般会结合WaitGroup来使用,wg相当于一个同步信号量,等到wg减到0,才开始下一步的逻辑

  • 如果wg等待goroutine,也就是说先给wg.add(Goroutine数目),然后使用defer this.wg.Done()return来指定当函数结束(也意味着Goroutine结束)时就给wg减一,这样能确保最终协程都结束时,wg也不再等待,开始下一步的逻辑
  • 如果wg等待channel里边的任务,会存在一些异常问题,例如channel任务没被消费完,Goroutine都异常中止了,那么wg永远等不到减到0的那一刻,程序就会hang住了

耗时正比例增加因为多打了log

上线之后从某一天开始发现list请求耗时突增到上线时的6倍,非常奇怪,而且耗时随着每次请求数据量的增加而增加,经过git diff版本分析代码,发现是后续debug增加了几行日志打印,打印数据为每个shardMeta返回的数据,而且打印日志的代码被mutex互斥锁锁住,也就意味着每次要串行打印n个object数据,打印1024次,每次耗时10ms,从而导致耗时剧增;后来删除log之后耗时恢复到正常水平

Golang相关技术和踩坑

defer和WaitGroup
  1. 在Golang中,defer表达式通常用来处理一些清理和释放资源的操作。defer后面的表达式会被放入一个列表中,在当前方法返回的时候,列表中的表达式就会被执行。一个方法中可以在一个或者多个地方使用defer表达式
  2. defer表达式中变量的值在defer表达式被定义时就已经明确
  3. defer表达式的调用顺序是按照先进后出的方式
  4. defer还可以用于在 return 之后修改函数的返回值
  5. Go语言中, panic用于抛出异常, recover用于捕获异常.

参考链接

1.Golang中defer的那些事