1什么是內(nèi)存池
1.1池化技術(shù)
所謂“池化技術(shù)”,就是程序先向系統(tǒng)申請(qǐng)過(guò)量的資源,然后自己管理,以備不時(shí)之需。之所以要申請(qǐng)過(guò) 量的資源,是因?yàn)槊看紊暾?qǐng)?jiān)撡Y源都有較大的開(kāi)銷(xiāo),不如提前申請(qǐng)好了,這樣使用時(shí)就會(huì)變得非常快 捷,大大提高程序運(yùn)行效率。 在計(jì)算機(jī)中,有很多使用“池”這種技術(shù)的地方,除了內(nèi)存池,還有連接池、線(xiàn)程池、對(duì)象池等。以服務(wù) 器上的線(xiàn)程池為例,它的主要思想是:先啟動(dòng)若干數(shù)量的線(xiàn)程,讓它們處于睡眠狀態(tài),當(dāng)接收到客戶(hù)端 的請(qǐng)求時(shí),喚醒池中某個(gè)睡眠的線(xiàn)程,讓它來(lái)處理客戶(hù)端的請(qǐng)求,當(dāng)處理完這個(gè)請(qǐng)求,線(xiàn)程又進(jìn)入睡眠 狀態(tài)。
1.2內(nèi)存池
內(nèi)存池是指程序預(yù)先從操作系統(tǒng)申請(qǐng)一塊足夠大內(nèi)存,此后,當(dāng)程序中需要申請(qǐng)內(nèi)存的時(shí)候,不是直接 向操作系統(tǒng)申請(qǐng),而是直接從內(nèi)存池中獲取;同理,當(dāng)程序釋放內(nèi)存的時(shí)候,并不真正將內(nèi)存返回給操 作系統(tǒng),而是返回內(nèi)存池。當(dāng)程序退出(或者特定時(shí)間)時(shí),內(nèi)存池才將之前申請(qǐng)的內(nèi)存真正釋放。減少用戶(hù)層向系統(tǒng)層的內(nèi)存申請(qǐng)調(diào)用,實(shí)現(xiàn)高效;
2.開(kāi)胃菜-設(shè)計(jì)一個(gè)定長(zhǎng)內(nèi)存池
此次設(shè)計(jì)的定長(zhǎng)內(nèi)存池ObjectPool結(jié)構(gòu)如下:(其單位小內(nèi)存塊的大小為T(mén)類(lèi)型的大小)
- 需要申請(qǐng)內(nèi)存時(shí),一次性申請(qǐng)一個(gè)大塊空間,記錄起始位置_memory;
- 需要使用size大小時(shí),將大塊內(nèi)存抽象切割成小塊,將起始位置_memory向后移動(dòng)size大小;
- 釋放內(nèi)存時(shí),將需要釋放的空間掛入自由鏈表_freeList,可供下次申請(qǐng)使用;注意:這個(gè)鏈表不是通過(guò)內(nèi)部指針連接下一個(gè),而是通過(guò)前一塊空間的前4or8個(gè)字節(jié)記錄后一個(gè)小空間的起始地址抽象連接的;
這樣做的目的是循環(huán)利用預(yù)先開(kāi)辟的一大塊空間,減少用戶(hù)層申請(qǐng)內(nèi)存時(shí)與系統(tǒng)層的交互,提高效率;
代碼實(shí)現(xiàn):
ObjectPool.h
#define _CRT_SECURE_NO_WARNINGS 1
#include
using namespace std;
#ifdef _WIN32
#include
#else
// 包Linux相關(guān)頭文件,增加代碼的可移植性;
#endif
// 直接去堆上按頁(yè)申請(qǐng)空間 擺脫malloc;
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
template
class ObjectPool
{
char* _memory = nullptr; // 指向大塊內(nèi)存的指針
size_t _remainBytes = 0; // 大塊內(nèi)存在切分過(guò)程中剩余字節(jié)數(shù)
void* _freeList = nullptr; // 還回來(lái)過(guò)程中鏈接的自由鏈表的頭指針
public:
T* New()
{
T* obj = nullptr;
//如果之前有申請(qǐng)的空間被釋放,那就先從free了的空間拿;(內(nèi)存池技術(shù))
if (_freeList != nullptr)
{
//頭刪
//注意!*((void**)_freeList)相當(dāng)訪取_freeList前4or8個(gè)字節(jié)操作(由32位系統(tǒng)->指針4byte and 64位系統(tǒng)->指針8byte決定);
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next;
}
else
{
if (_remainBytes < sizeof(T))
{
// 剩余內(nèi)存不夠一個(gè)對(duì)象大小時(shí),則重新開(kāi)大塊空間
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(_remainBytes);
//使用SystemAlloc直接向堆申請(qǐng)內(nèi)存,脫離malloc,方便體現(xiàn)malloc和該ObjectPool的差別;
_memory = (char*)SystemAlloc(_remainBytes >> 13);//char*類(lèi)型內(nèi)存可以使其+1or-1的單位操作為1字節(jié),方便內(nèi)存管理;
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
//防止某個(gè)T類(lèi)還沒(méi)當(dāng)前系統(tǒng)下一個(gè)指針大小大 那么就裝不下后一個(gè)的地址了,這里做特殊處理;至少保證一個(gè)對(duì)象內(nèi)足夠裝的下一個(gè)指針大小;
int Objsize = sizeof(T) < sizeof(void*) ? sizeof(void*): sizeof(T);
obj = (T*)_memory;
_memory += Objsize;
_remainBytes -= Objsize;
}
// 定位new,內(nèi)置類(lèi)型不處理,自定義類(lèi)型調(diào)用其構(gòu)造函數(shù);
new(obj)T;
return obj;
}
void Delete(T* obj)
{
//內(nèi)置類(lèi)型不處理,自定義類(lèi)型調(diào)用其構(gòu)析構(gòu)函數(shù);
obj->~T();
//頭插(此處不是真正的刪除,而是標(biāo)志為未使用,掛入自由鏈表)
*(void**)obj = _freeList;
_freeList = obj;
}
};
ObjectPool.cpp測(cè)試
#include
#include "ConcurrentAlloc.h"
//用于測(cè)試的類(lèi)型樹(shù)節(jié)點(diǎn);
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};
void TestObjectPool()
{
// 申請(qǐng)釋放的輪次
const size_t Rounds = 5;
// 每輪申請(qǐng)釋放多少次
const size_t N = 100000;
std::vector v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
std::vector v2;
v2.reserve(N);
ObjectPool TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < N; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
int mian()
{
TestObjectPool();
return 0;
}*>*>
運(yùn)行結(jié)果:
顯然,定長(zhǎng)內(nèi)存池的New空間比new(底層封裝的是malloc申請(qǐng)空間)更高效
3.TCmalloc(高并發(fā)內(nèi)存池)整體框架介紹:
谷歌開(kāi)源項(xiàng)目:TCMalloc (google-perftools) 是用于優(yōu)化C++寫(xiě)的多線(xiàn)程應(yīng)用,比glibc 2.3的malloc快。這個(gè)模塊可以用來(lái)讓MySQL在高并發(fā)下內(nèi)存占用更加穩(wěn)定。
ThreadCache
thread cache:線(xiàn)程緩存是每個(gè)線(xiàn)程獨(dú)有的,用于小于256KB的內(nèi)存的分配(設(shè)計(jì)規(guī)定),線(xiàn)程從這里申請(qǐng)內(nèi)存不需要加鎖,每個(gè)線(xiàn)程獨(dú)享一個(gè)cache,這也就是這個(gè)并發(fā)線(xiàn)程池高效的地方。核心結(jié)構(gòu)是FreeList _freeLists[NFREELIST];eg:_freeLists[2]代表該size對(duì)應(yīng)哈希桶中有n個(gè)未使用的obj size大小的小內(nèi)存塊,當(dāng)需要申請(qǐng)時(shí)優(yōu)先從_freeList中拿無(wú)人使用的被掛起的obj空間;
CentralCache
central cache:中心緩存是所有線(xiàn)程所共享,thread cache是按需從central cache中獲取的對(duì) 象。central cache合適的時(shí)機(jī)回收thread cache中的對(duì)象,避免一個(gè)線(xiàn)程占用了太多的內(nèi)存,而 其他線(xiàn)程的內(nèi)存吃緊,達(dá)到內(nèi)存分配在多個(gè)線(xiàn)程中更均衡的按需調(diào)度的目的。central cache是存在競(jìng)爭(zhēng)的,所以從這里取內(nèi)存對(duì)象是需要加鎖,首先這里用的是桶鎖,其次只有thread cache的 沒(méi)有內(nèi)存對(duì)象時(shí)才會(huì)找central cache,所以這里競(jìng)爭(zhēng)不會(huì)很激烈。其核心結(jié)構(gòu)是SpanList _spanLists[NFREELIST];eg:_spanLists[2]代表該size對(duì)應(yīng)鏈桶中的n個(gè)span,每個(gè)span下面掛了n個(gè)大小為size的obj小內(nèi)存塊;(size大小是指通過(guò)測(cè)試和計(jì)算,將所需內(nèi)存大小x字節(jié)依據(jù)某種對(duì)齊規(guī)則對(duì)其為size字節(jié),使span的鏈?zhǔn)酵敖Y(jié)構(gòu)數(shù)量合適,大小統(tǒng)一)
page cache
page cache:頁(yè)緩存是在central cache緩存上面的一層緩存,存儲(chǔ)的內(nèi)存是以頁(yè)為單位存儲(chǔ)及分 配的,central cache沒(méi)有內(nèi)存對(duì)象時(shí),從page cache分配出一定數(shù)量的page,并切割成定長(zhǎng)大小 的小塊內(nèi)存,分配給central cache。當(dāng)一個(gè)span的幾個(gè)跨度頁(yè)的對(duì)象都回收以后,page cache 會(huì)回收central cache滿(mǎn)足條件的span對(duì)象,并且合并相鄰的頁(yè),組成更大的頁(yè),緩解內(nèi)存碎片 的問(wèn)題。Page中的核心結(jié)構(gòu)是SpanList _spanLists[NPAGES];eg:_spanLists[2]代表該鏈桶含有n個(gè)大小為2頁(yè)(2*8字節(jié))的span;如果從page[1]到page[128]都沒(méi)有可用span,那么只能從系統(tǒng)堆直接申請(qǐng)一塊大空間放進(jìn)去,再給central thread層用了;
本文TCmalloc整體項(xiàng)目圍繞上述三層結(jié)構(gòu)實(shí)現(xiàn),抽取原項(xiàng)目的小部分核心內(nèi)容學(xué)習(xí);這個(gè)項(xiàng)目會(huì)用到C/C++、數(shù)據(jù)結(jié)構(gòu)(鏈表、哈希桶)、操作系統(tǒng)內(nèi)存管理、單例模式、多線(xiàn)程、互斥鎖 等等方面的知識(shí)。
4.代碼實(shí)現(xiàn):
Common.h
這是一個(gè)通用頭文件,包含各種所需要的庫(kù)文件和三層結(jié)構(gòu)需要的對(duì)其數(shù)和size-申請(qǐng)數(shù)量轉(zhuǎn)換的函數(shù);
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using std::cout;
using std::endl;
#ifdef _WIN32
#include
#else
// ...
#endif
static const size_t MAX_BYTES = 256 * 1024;//ThreadCache中允許申請(qǐng)的最大字節(jié);
static const size_t NFREELIST = 208;//通過(guò)對(duì)齊計(jì)算的thread和central中最大哈希桶下表;
static const size_t NPAGES = 129;//最多128頁(yè),為了方便映射哈希,從第1頁(yè)開(kāi)始計(jì)算;
static const size_t PAGE_SHIFT = 13;//一頁(yè)==8kb==2^13字節(jié)大小 所以 p地址>>PAGE_SHIFT ==PAGE_ID 可相當(dāng)于將某連續(xù)地址以2^13字節(jié)對(duì)齊,并標(biāo)上頁(yè)號(hào)方便管理;eg:0x00000000~0x2^13這連續(xù)的2^13個(gè)地址 經(jīng)過(guò)>> 為PAGE_ID==1,后連續(xù)的2^13個(gè)地址 經(jīng)過(guò)>> 為PAGE_ID==2;
//多系統(tǒng)編程;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
// linux
#endif
// 直接去堆上按頁(yè)申請(qǐng)空間
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}
//NextObj(obj) 等價(jià)于 *(void**)obj,取obj前4or8個(gè)字節(jié)(存放下一個(gè)小空間地址的位置)進(jìn)行操作,增加語(yǔ)義;
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
// 管理切分好的小對(duì)象的自由鏈表
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);
// 頭插
//*(void**)obj = _freeList;
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
//從central中申請(qǐng)一批obj大小的內(nèi)存塊,range插入;
void PushRange(void* start, void* end, size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
///threadcache還一段list給central cache
void PopRange(void*& start, void*& end, size_t n)//n==list.size
{
assert(n <= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void* Pop()
{
assert(_freeList);
// 頭刪
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;//慢開(kāi)始算法--從1啟動(dòng);
size_t _size = 0;//當(dāng)前_freeList桶里的obj小空間個(gè)數(shù)
};
// 計(jì)算對(duì)象大小的對(duì)齊映射規(guī)則 方便控制哈希桶用的數(shù)量(不要太多) 便于管理obj小內(nèi)存空間
class SizeClass
{
public:
// 整體控制在最多10%左右的內(nèi)碎片浪費(fèi)的對(duì)齊規(guī)則;
// [1,128] 8byte對(duì)齊 freelist[0,16) eg:1~128字節(jié)的obj 按照8byte對(duì)齊(eg:1~7字節(jié)的對(duì)象都放入_freeList[0]) 則1~128字節(jié)的obj 需要的桶index為0~16即可;
// [128+1,1024] 16byte對(duì)齊 freelist[16,72)
// [1024+1,8*1024] 128byte對(duì)齊 freelist[72,128)
// [8*1024+1,64*1024] 1024byte對(duì)齊 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte對(duì)齊 freelist[184,208)
/*size_t _RoundUp(size_t size, size_t alignNum)
{
size_t alignSize;
if (size % alignNum != 0)
{
alignSize = (size / alignNum + 1)*alignNum;
}
else
{
alignSize = size;
}
return alignSize;
}*/
// 1-8
//位運(yùn)算提高效率
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
//通過(guò)size大小計(jì)算對(duì)其數(shù)函數(shù);
static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8*1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64*1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8*1024);
}
else
{
return _RoundUp(size, 1< }
}
/*size_t _Index(size_t bytes, size_t alignNum)
{
if (bytes % alignNum == 0)
{
return bytes / alignNum - 1;
}
else
{
return bytes / alignNum;
}
}*/
//位運(yùn)算提高效率
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 通過(guò)size計(jì)算index位置即映射到哪一個(gè)自由鏈表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每個(gè)區(qū)間有多少個(gè)鏈
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128){
return _Index(bytes, 3);//3表示按8byte對(duì)其
}
else if (bytes <= 1024){
return _Index(bytes - 128, 4) + group_array[0];//4表示按16byte對(duì)其, - 128因?yàn)榍?28個(gè)字節(jié)按照別的對(duì)齊規(guī)則的,剩下的這些按照自己的對(duì)其數(shù)對(duì)其最后+前面總共的桶數(shù)量即可計(jì)算自己的index;
}
else if (bytes <= 8 * 1024){
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024){
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024){
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else{
assert(false);
}
return -1;
}
// 一次thread cache從中心緩存獲取多少個(gè),池化技術(shù):當(dāng)thread cache沒(méi)有可用obj空間時(shí),會(huì)向中心緩存申請(qǐng)一批而不是一個(gè);
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移動(dòng)多少個(gè)對(duì)象的(慢啟動(dòng))上限值
// 小對(duì)象一次批量上限高
// 小對(duì)象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
// 計(jì)算一次向系統(tǒng)獲取幾個(gè)頁(yè);當(dāng)centralCache對(duì)應(yīng)index無(wú)可用span時(shí),向pagecache按頁(yè)大小申請(qǐng),之后再把申請(qǐng)的span切割成n個(gè)index大小的obj內(nèi)存塊;
// 單個(gè)對(duì)象 8byte
// ...
// 單個(gè)對(duì)象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num*size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
};
// 管理多個(gè)連續(xù)頁(yè)大塊內(nèi)存跨度結(jié)構(gòu)
struct Span
{
PAGE_ID _pageId = 0; // 大塊內(nèi)存起始頁(yè)的頁(yè)號(hào);將申請(qǐng)的viod*通過(guò)>>PAGE_SHIFT(8K),映射成數(shù)字方便管理和掛桶;
size_t _n = 0; // 頁(yè)的數(shù)量
Span* _next = nullptr; // 雙向鏈表的結(jié)構(gòu)
Span* _prev = nullptr;
size_t _objSize = 0; // 切好的小對(duì)象的大小
size_t _useCount = 0; // 切好小塊內(nèi)存,被分配給thread cache的計(jì)數(shù),方便回收span,_useCount==0;
void* _freeList = nullptr; // 切好的小塊內(nèi)存的自由鏈表
bool _isUse = false; // 是否在被使用,方便合并span,減少內(nèi)存碎片;
};
// 帶頭雙向循環(huán)鏈表 封裝Span節(jié)點(diǎn)
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//Begin()和End()模擬了帶頭雙向循環(huán)鏈表;
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->_next == _head;
}
void PushFront(Span* span)
{
Insert(Begin(), span);
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
//“雙向鏈表實(shí)現(xiàn)一個(gè)Insert即可高效復(fù)用”
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
// prev newspan pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; // 桶鎖 可能多個(gè)threadcache同時(shí)訪問(wèn)central中的同一個(gè)index桶,加鎖-線(xiàn)程安全
};);
ThreadCache.h
thread cache是哈希桶結(jié)構(gòu),每個(gè)桶是一個(gè)按桶位置映射大小的內(nèi)存塊對(duì)象的自由鏈表。每個(gè)線(xiàn)程都會(huì) 有一個(gè)thread cache對(duì)象,這樣每個(gè)線(xiàn)程在這里獲取對(duì)象和釋放對(duì)象時(shí)是無(wú)鎖的。
申請(qǐng)內(nèi)存:
- 當(dāng)內(nèi)存申請(qǐng)size<=256KB,先獲取到線(xiàn)程本地存儲(chǔ)的thread cache對(duì)象,計(jì)算size映射的哈希桶自 由鏈表下標(biāo)i。
- 如果自由鏈表_freeLists[i]中有對(duì)象,則直接Pop一個(gè)內(nèi)存對(duì)象返回。
- 如果_freeLists[i]中沒(méi)有對(duì)象時(shí),則批量從central cache中獲取一定數(shù)量的對(duì)象,插入到自由鏈表 并返回一個(gè)對(duì)象。
釋放內(nèi)存:
2. 當(dāng)鏈表的長(zhǎng)度過(guò)長(zhǎng),則回收一部分內(nèi)存對(duì)象到central cache。
class ThreadCache
{
public:
// 申請(qǐng)和釋放內(nèi)存對(duì)象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
// 從中心緩存獲取對(duì)象
void* FetchFromCentralCache(size_t index, size_t size);
// 釋放對(duì)象時(shí),鏈表過(guò)長(zhǎng)時(shí),回收內(nèi)存回到中心緩存
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
// TLS thread local storage TLS技術(shù),使每個(gè)線(xiàn)程里的ThreadCache*獨(dú)享,互不影響,實(shí)現(xiàn)高并發(fā);
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
// 管理切分好的小對(duì)象的自由鏈表
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);
// 頭插
//*(void**)obj = _freeList;
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
void PushRange(void* start, void* end, size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
void PopRange(void*& start, void*& end, size_t n)
{
assert(n >= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void* Pop()
{
assert(_freeList);
// 頭刪
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;
size_t _size = 0;
};
ThreadCache.cpp
#include "CentralCache.h"
//從thradcache 從 central中要內(nèi)存
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// 慢開(kāi)始反饋調(diào)節(jié)算法
// 1、最開(kāi)始不會(huì)一次向central cache一次批量要太多,因?yàn)橐嗔丝赡苡貌煌?br /> // 2、如果你不要這個(gè)size大小內(nèi)存需求,那么batchNum就會(huì)不斷增長(zhǎng),直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
//輸出型參數(shù) 將batchNum個(gè)obj小空間從centralcache中帶出來(lái);
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum > 0);
if (actualNum == 1)//就申請(qǐng)到一個(gè)obj空間 直接返回給申請(qǐng)的人用
{
assert(start == end);
return start;
}
else//申請(qǐng)了多個(gè)obj空間 只返回第一個(gè) 則 剩下的插入_freeList留著后面?zhèn)溆?
{
_freeLists[index].PushRange(NextObj(start), end, actualNum-1);
return start;
}
}
//多線(xiàn)程申請(qǐng)
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);//計(jì)算對(duì)齊數(shù) 相當(dāng)于需要obj的size
size_t index = SizeClass::Index(size);//計(jì)算哈希桶index
if (!_freeLists[index].Empty())//還有剩余直接pop拿走一個(gè)
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index, alignSize);//沒(méi)了 從central中要
}
}
//多線(xiàn)程釋放
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找對(duì)映射的自由鏈表桶,對(duì)象插入進(jìn)入
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
//釋放完了檢查下freelist里的obj,看看需不需要回收到centralcache
// 設(shè)定:當(dāng)鏈表長(zhǎng)度大于一次批量申請(qǐng)的內(nèi)存時(shí)就開(kāi)始還一段list給central cache
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}
//一段list給central cache
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
//輸出型參數(shù);
void* start = nullptr;
void* end = nullptr;
//先從_freeList中彈出
list.PopRange(start, end, list.MaxSize());
//再收回到central中對(duì)應(yīng)index的Span鏈表中
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
CentralCache.h
central cache也是一個(gè)哈希桶結(jié)構(gòu),他的哈希桶的映射關(guān)系跟thread cache是一樣的。不同的是他的每 個(gè)哈希桶位置掛是SpanList鏈表結(jié)構(gòu),不過(guò)每個(gè)映射桶下面的span中的大內(nèi)存塊被按映射關(guān)系切成了一 個(gè)個(gè)小內(nèi)存塊對(duì)象掛在span的自由鏈表中。
申請(qǐng)內(nèi)存:
1.當(dāng)thread cache中沒(méi)有內(nèi)存時(shí),就會(huì)批量向central cache申請(qǐng)一些內(nèi)存對(duì)象,這里的批量獲取對(duì) 象的數(shù)量使用了類(lèi)似網(wǎng)絡(luò)tcp協(xié)議擁塞控制的慢開(kāi)始算法;central cache也有一個(gè)哈希映射的 spanlist,spanlist中掛著span,從span中取出對(duì)象給thread cache,這個(gè)過(guò)程是需要加鎖的,不 過(guò)這里使用的是一個(gè)桶鎖,盡可能提高效率。
2.central cache映射的spanlist中所有span的都沒(méi)有內(nèi)存以后,則需要向page cache申請(qǐng)一個(gè)新的 span對(duì)象,拿到span以后將span管理的內(nèi)存按大小切好作為自由鏈表鏈接到一起。然后從span 中取對(duì)象給thread cache。
3.central cache的中掛的span中use_count記錄分配了多少個(gè)對(duì)象出去,分配一個(gè)對(duì)象給thread cache,就++use_count。
釋放內(nèi)存:
1.當(dāng)thread_cache過(guò)長(zhǎng)或者線(xiàn)程銷(xiāo)毀,則會(huì)將內(nèi)存釋放回central cache中的,釋放回來(lái)時(shí)-- use_count。當(dāng)use_count減到0時(shí)則表示所有對(duì)象都回到了span,則將span釋放回page cache, page cache中會(huì)對(duì)前后相鄰的空閑頁(yè)進(jìn)行合并。
#include "Common.h"
// 單例模式(餓漢模式)全局就一個(gè)CentralCache;
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 獲取一個(gè)非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// 從中心緩存獲取一定數(shù)量的對(duì)象給thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
// 將一定數(shù)量的obj釋放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
CentralCache.cpp
//#include "PageCache.h"
CentralCache CentralCache::_sInst;
// 獲取一個(gè)非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)//參數(shù)list是某一個(gè)確定的index桶
{
// 查看當(dāng)前的spanlist中是否有還有未分配對(duì)象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;//這里不需要改it的span中的屬性,因?yàn)榈茸詈蠓纸othreadcache了obj以后 才算其中的內(nèi)存被分出去了 里面的usecount等才需要改;
}
else
{
it = it->_next;
}
}
// 走到這里說(shuō)沒(méi)有空閑span了,只能找page cache要
// 先把central cache的桶鎖解掉,這樣如果其他線(xiàn)程釋放內(nèi)存對(duì)象回來(lái),不會(huì)阻塞(你要從page申請(qǐng)了 不能別的線(xiàn)程在這個(gè)桶釋放)
list._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();//整個(gè)pagecache結(jié)構(gòu)可能會(huì)從index1~index129挨個(gè)操作每個(gè)桶 因此需要上大鎖;
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;//分跟central的span標(biāo)記已使用因?yàn)轳R上就要切分給obj用了
span->_objSize = size;//每個(gè)span掛的固定小內(nèi)存塊obj大小size
PageCache::GetInstance()->_pageMtx.unlock();
// 對(duì)獲取span進(jìn)行切分,不需要加鎖,因?yàn)檫@會(huì)其他線(xiàn)程訪問(wèn)不到這個(gè)span
// 計(jì)算span的大塊內(nèi)存的起始地址和大塊內(nèi)存的大小(字節(jié)數(shù))
char* start = (char*)(span->_pageId << PAGE_SHIFT);//這里的_pageId是從底層按頁(yè)申請(qǐng)內(nèi)存的時(shí)候地址轉(zhuǎn)換來(lái)的,現(xiàn)在需要用地址就轉(zhuǎn)換回去;
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
// 把大塊內(nèi)存切成自由鏈表鏈接起來(lái)
// 1、先切一塊下來(lái)去做頭,方便尾插(尾插原因,切出來(lái)一般是連續(xù)的,那么尾插給到span上掛小obj也是連續(xù),提高內(nèi)存命中率)
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1;
while (start < end)
{
++i;
NextObj(tail) = start;
tail = NextObj(tail); // tail = start;
start += size;
}
NextObj(tail) = nullptr;
// 切好span以后,需要把span掛到桶里面去的時(shí)候,再加鎖
list._mtx.lock();
list.PushFront(span);
return span;
}
// 從中心緩存獲取一定數(shù)量的對(duì)象給thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();//上桶鎖
Span* span = GetOneSpan(_spanLists[index],size);
assert(span);
assert(span->_freeList);
// 從span中獲取batchNum個(gè)對(duì)象
// 如果不夠batchNum個(gè),有多少拿多少
start = span->_freeList;
end = start;
int n = 1;//n為實(shí)際拿到的數(shù)量,start直接給了所以起始值為1;
for (int i = 0; i < batchNum - 1; i++)
{
if (NextObj(end) == nullptr) break;
end = NextObj(end);
++n;
}
//span被切出去給obj使用了 span的一些屬性得改變了;
span->_useCount += n;
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
_spanLists[index]._mtx.unlock();
return n;
}
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//小obj頭插入span中的_freeList
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
// 說(shuō)明span的切分出去的所有小塊內(nèi)存都回來(lái)了
// 這個(gè)span就可以再回收給page cache,pagecache可以再?lài)L試去做前后頁(yè)的合并
if (span->_useCount == 0)
{
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
// 釋放span給page cache時(shí),使用page cache的鎖就可以了
// 這時(shí)把桶鎖解掉 不影響其他線(xiàn)程對(duì)該index的central操作;
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);//還給page和page是否需要合并其中的span減少內(nèi)存碎片都在這函數(shù)里
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
PageMap.h
使用自己定義的PageMap哈希直接映射,避免stl的線(xiàn)程安全,提高效率;
#include"Common.h"
// Single-level array
template
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast((*allocator)(sizeof(void*) << BITS));
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
// Two-level radix tree
template
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
// Three-level radix tree
template
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};*>*>*>*>*>*>**>
PageCache.h
申請(qǐng)內(nèi)存:
- 當(dāng)central cache向page cache申請(qǐng)內(nèi)存時(shí),page cache先檢查對(duì)應(yīng)位置有沒(méi)有span,如果沒(méi)有 則向更大頁(yè)尋找一個(gè)span,如果找到則分裂成兩個(gè)。比如:申請(qǐng)的是4頁(yè)page,4頁(yè)page后面沒(méi) 有掛span,則向后面尋找更大的span,假設(shè)在10頁(yè)page位置找到一個(gè)span,則將10頁(yè)page span分裂為一個(gè)4頁(yè)page span和一個(gè)6頁(yè)page span。
- 如果找到_spanList[128]都沒(méi)有合適的span,則向系統(tǒng)使用mmap、brk或者是VirtualAlloc等方式 申請(qǐng)128頁(yè)page span掛在自由鏈表中,再重復(fù)1中的過(guò)程。 3
- 需要注意的是central cache和page cache 的核心結(jié)構(gòu)都是spanlist的哈希桶,但是他們是有本質(zhì) 區(qū)別的,central cache中哈希桶,是按跟thread cache一樣的大小對(duì)齊關(guān)系映射的,他的spanlist 中掛的span中的內(nèi)存都被按映射關(guān)系切好鏈接成小塊內(nèi)存的自由鏈表。而page cache 中的 spanlist則是按下標(biāo)桶號(hào)映射的,也就是說(shuō)第i號(hào)桶中掛的span都是i頁(yè)內(nèi)存。
釋放內(nèi)存:
如果central cache釋放回一個(gè)span,則依次尋找span的前后page id的沒(méi)有在使用的空閑span ,看是否可以合并,如果合并繼續(xù)向前尋找。這樣就可以將切小的內(nèi)存合并收縮成大的span,減少內(nèi)存碎片。
#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 獲取從對(duì)象到span的映射
Span* MapObjectToSpan(void* obj);
// 釋放空閑span回到Pagecache,并合并相鄰的span
void ReleaseSpanToPageCache(Span* span);
// 獲取一個(gè)K頁(yè)的span
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
ObjectPool _spanPool;//這里用上了之前編寫(xiě)的ObjectPool定長(zhǎng)內(nèi)存池 用來(lái)New(span)脫離malloc
//std::unordered_map _idSpanMap;
//std::map _idSpanMap;
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};,>,>
PageCache.cpp
PageCache PageCache::_sInst;
// 獲取一個(gè)K頁(yè)的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
// 大于128 page的直接向堆申請(qǐng)
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//_idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);
return span;
}
// 先檢查第k個(gè)桶里面有沒(méi)有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
// 建立id和span的映射,方便central cache回收小塊內(nèi)存時(shí),查找對(duì)應(yīng)的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
// 檢查一下后面的桶里面有沒(méi)有span,如果有可以把他它進(jìn)行切分
for (size_t i = k + 1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();//nSpan代表切割以后剩下的span,他還是未使用的,還在pagecache中是連續(xù)的,所以映射首尾即可
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();//kSpan代表從某個(gè)大內(nèi)存n+k Span中切出來(lái)的kSpan,他要給到central之后進(jìn)行obj切割,進(jìn)而往thread給;
// 在nSpan的頭部切一個(gè)k頁(yè)下來(lái)
// k頁(yè)span返回
// nSpan再掛到對(duì)應(yīng)映射的位置
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
// 存儲(chǔ)nSpan的首尾頁(yè)號(hào)跟nSpan映射,方便page cache回收內(nèi)存時(shí)進(jìn)行的合并查找
//_idSpanMap[nSpan->_pageId] = nSpan;
//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);
// 建立id和kSpan的映射,方便central cache回收小塊內(nèi)存時(shí),查找對(duì)應(yīng)的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
}
// 走到這個(gè)位置就說(shuō)明后面沒(méi)有大頁(yè)的span了
// 這時(shí)就去找堆要一個(gè)128頁(yè)的span
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
//將一個(gè)obj小內(nèi)存塊它對(duì)應(yīng)的Span*找到(通過(guò)轉(zhuǎn)換成page_id再map出Span*)
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
/*std::unique_lock lock(_pageMtx);
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}*/
auto ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
}
//從central向page歸還span大塊內(nèi)存(這個(gè)Span的 usecount==0了即分出去的小obj都還回來(lái)了),歸還Span并嘗試合并!
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 大于128 page的直接還給堆
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
// 對(duì)span前后的頁(yè),嘗試進(jìn)行合并,緩解內(nèi)存碎片問(wèn)題
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
// 前面的頁(yè)號(hào)沒(méi)有,不合并了
auto ret = (Span*)_idSpanMap.get(prevId);
if (ret == nullptr)
{
break;
}
// 前面相鄰頁(yè)的span在使用,不合并了
Span* prevSpan = ret;
if (prevSpan->_isUse == true)
{
break;
}
// 合并出超過(guò)128頁(yè)的span沒(méi)辦法管理,不合并了
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
// 向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
/*auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}*/
auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr)
{
break;
}
Span* nextSpan = ret;
if (nextSpan->_isUse == true)
{
break;
}
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
//合并的大內(nèi)存塊 插回Pagecache中 設(shè)置為未使用(未分到central中)狀態(tài);
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
//_idSpanMap[span->_pageId] = span;
//_idSpanMap[span->_pageId+span->_n-1] = span;
//記錄前后頁(yè)號(hào)方便下次合并;
_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n - 1, span);
}
ConcurrentAlloc.h
#include "Common.h"
#include "ThreadCache.h"
#include "PageCache.h"
#include "ObjectPool.h"
//tcmallo
static void* ConcurrentAlloc(size_t size)
{
//一次申請(qǐng)內(nèi)存大于thread最大的256kb,則直接向page層按頁(yè)直接申請(qǐng),不需要經(jīng)過(guò)Thread層;
if (size > MAX_BYTES)
{
size_t alignSize = SizeClass::RoundUp(size);
size_t kpage = alignSize >> PAGE_SHIFT;
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
span->_objSize = size;//這個(gè)span就是為了_objsize單位內(nèi)存而申請(qǐng)的
PageCache::GetInstance()->_pageMtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
// 通過(guò)TLS 每個(gè)線(xiàn)程無(wú)鎖的獲取自己的專(zhuān)屬的ThreadCache對(duì)象
if (pTLSThreadCache == nullptr)
{
static ObjectPool tcPool;//只需要一個(gè)tcpool,用于調(diào)用New,所以設(shè)置成靜態(tài);
//pTLSThreadCache = new ThreadCache;
pTLSThreadCache = tcPool.New();
}
return pTLSThreadCache->Allocate(size);
}
}
//tcfree
static void ConcurrentFree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_objSize;
if (size > MAX_BYTES)//證明這個(gè)Span直接是按頁(yè)從page申請(qǐng)的_objsize>MAX_SIZE,沒(méi)經(jīng)過(guò)thread,那么直接ReleaseSpanToPageCache
{
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
5.測(cè)試文件:
Test.cpp
// ntimes 一輪申請(qǐng)和釋放內(nèi)存的次數(shù)
// rounds 輪次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector vthread(nworks);
std::atomic malloc_costtime = 0;
std::atomic free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(malloc(16));
v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u個(gè)線(xiàn)程并發(fā)執(zhí)行%u輪次,每輪次malloc %u次: 花費(fèi):%u msn",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u個(gè)線(xiàn)程并發(fā)執(zhí)行%u輪次,每輪次free %u次: 花費(fèi):%u msn",
nworks, rounds, ntimes, free_costtime.load());
printf("%u個(gè)線(xiàn)程并發(fā)malloc&free %u次,總計(jì)花費(fèi):%u msn",
nworks, nworks*rounds*ntimes, malloc_costtime.load() + free_costtime.load());
}
// 單輪次申請(qǐng)釋放次數(shù) 線(xiàn)程數(shù) 輪次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector vthread(nworks);
std::atomic malloc_costtime = 0;
std::atomic free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(ConcurrentAlloc(16));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u個(gè)線(xiàn)程并發(fā)執(zhí)行%u輪次,每輪次concurrent alloc %u次: 花費(fèi):%u msn",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u個(gè)線(xiàn)程并發(fā)執(zhí)行%u輪次,每輪次concurrent dealloc %u次: 花費(fèi):%u msn",
nworks, rounds, ntimes, free_costtime.load());
printf("%u個(gè)線(xiàn)程并發(fā)concurrent alloc&dealloc %u次,總計(jì)花費(fèi):%u msn",
nworks, nworks*rounds*ntimes.load(), malloc_costtime + free_costtime.load());
}
int main()
{
size_t n = 10000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}*>*>
malloc與tcmallo測(cè)試對(duì)比結(jié)果
總結(jié)
在整體框架編寫(xiě)完畢后,進(jìn)行測(cè)試對(duì)比并不能體現(xiàn)tcmalloc比malloc效率高,如下圖;但是在tcmalloc進(jìn)行 map(Span*,PAGE_ID),這個(gè)映射采用基數(shù)樹(shù)的優(yōu)化后效率大大提高,超過(guò)了malloc。
不高效存在性能瓶頸的原因
根據(jù)測(cè)試,map的find越慢,會(huì)導(dǎo)致其lock()越慢,因此map中的find占了整體的15%,加鎖解鎖的過(guò)程占據(jù)了程序運(yùn)行的20%times,這就是未優(yōu)化的性能瓶頸;優(yōu)化前Span*和page_id的映射使用stl標(biāo)準(zhǔn)庫(kù)中的unorder_map,是非線(xiàn)程安全的,需要加鎖保證線(xiàn)程安全; 優(yōu)化了后使用PAGE_MAP 設(shè)計(jì)原理是直接開(kāi)滿(mǎn)page_id所有范圍的空間,采取直接映射;這樣即便是多線(xiàn)程也不需要加鎖;
不加鎖的原因
1.兩個(gè)線(xiàn)程對(duì)map中不同的位置讀寫(xiě):因?yàn)閮蓚€(gè)線(xiàn)程在對(duì)基數(shù)樹(shù)map一個(gè)讀,一個(gè)寫(xiě)的時(shí)候,相應(yīng)空間早都開(kāi)好了,不會(huì)改變結(jié)構(gòu),因此讀寫(xiě)互不影響,一個(gè)在自己的地方讀,一個(gè)在自己的地方寫(xiě),而STL容器中map紅黑樹(shù)寫(xiě)入會(huì)旋轉(zhuǎn),unorder_map哈希寫(xiě)入也涉及增容,結(jié)構(gòu)有可能會(huì)改變,因此兩個(gè)位置的讀寫(xiě)有可能互相影響導(dǎo)致線(xiàn)程不安全;
2.兩個(gè)線(xiàn)程對(duì)map中相同的位置寫(xiě):因?yàn)閷?xiě)只會(huì)在NewSpan和ReleaseSpanToPageCache中,一個(gè)是central沒(méi)有span了從pagecache申請(qǐng),一個(gè)是central 把沒(méi)使用的usecount==0的span大塊內(nèi)存還給paghecache,因?yàn)閜age只有有span和無(wú)span兩種狀態(tài)這兩個(gè)函數(shù)不可能在兩個(gè)線(xiàn)程中同時(shí)執(zhí)行
3.兩個(gè)線(xiàn)程對(duì)map中相同的位置讀:因?yàn)樽x只會(huì)在ConcurrentFree和ReleaseListToSpans中,對(duì)于同一個(gè)位置index,肯定是先ConcurrentFree(void*obj),之后objfree到一定的量(_freeLists[index].Size() >= _freeLists[index].MaxSize(慢開(kāi)始的maxsize))時(shí),再調(diào)用ReleaseListToSpans將freelist[index]從threadcache還給centralcache的,也不會(huì)兩個(gè)線(xiàn)程同時(shí)發(fā)生;
- stl的map中的find會(huì)依次遍歷O(n),而直接映射的時(shí)間效率是O(1),相當(dāng)于以部分空間換取時(shí)間的一種舉措;
優(yōu)化了stl中map.find()慢的問(wèn)題同時(shí)不需要加鎖了,兩個(gè)主要問(wèn)題在犧牲點(diǎn)空間的情況下完美解決,因此性能瓶頸得到優(yōu)化,tcmalloc高效與malloc。
-
內(nèi)存
+關(guān)注
關(guān)注
8文章
3081瀏覽量
74592 -
操作系統(tǒng)
+關(guān)注
關(guān)注
37文章
6941瀏覽量
124155 -
程序
+關(guān)注
關(guān)注
117文章
3806瀏覽量
81708 -
線(xiàn)程
+關(guān)注
關(guān)注
0文章
507瀏覽量
19855
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
詳解內(nèi)存池技術(shù)的原理與實(shí)現(xiàn)

C++內(nèi)存池的設(shè)計(jì)與實(shí)現(xiàn)
【每日一練】第十六節(jié):內(nèi)存池的使用
內(nèi)存池可以調(diào)節(jié)內(nèi)存的大小嗎
內(nèi)存池的概念和實(shí)現(xiàn)原理概述
關(guān)于RT-Thread內(nèi)存管理的內(nèi)存池簡(jiǎn)析
RT-Thread操作系統(tǒng)中靜態(tài)內(nèi)存池的創(chuàng)建與使用
RT-Thread內(nèi)存管理之內(nèi)存池實(shí)現(xiàn)分析
刪除靜態(tài)內(nèi)存池是用rt_mp_detach還是rt_mp_delete
Linux 內(nèi)存池源碼淺析
高并發(fā)內(nèi)存池項(xiàng)目實(shí)現(xiàn)

如何實(shí)現(xiàn)一個(gè)高性能內(nèi)存池

nginx內(nèi)存池源碼設(shè)計(jì)

內(nèi)存池主要解決的問(wèn)題

評(píng)論