數(shù)據(jù)庫與緩存讀寫模式策略
寫完數(shù)據(jù)庫后是否需要馬上更新緩存還是直接刪除緩存?
(1)、如果寫數(shù)據(jù)庫的值與更新到緩存值是一樣的,不需要經(jīng)過任何的計算,可以馬上更新緩存,但是如果對于那種寫數(shù)據(jù)頻繁而讀數(shù)據(jù)少的場景并不合適這種解決方案,因為也許還沒有查詢就被刪除或修改了,這樣會浪費時間和資源
(2)、如果寫數(shù)據(jù)庫的值與更新緩存的值不一致,寫入緩存中的數(shù)據(jù)需要經(jīng)過幾個表的關(guān)聯(lián)計算后得到的結(jié)果插入緩存中,那就沒有必要馬上更新緩存,只有刪除緩存即可,等到查詢的時候在去把計算后得到的結(jié)果插入到緩存中即可。
所以一般的策略是當(dāng)更新數(shù)據(jù)時,先刪除緩存數(shù)據(jù),然后更新數(shù)據(jù)庫,而不是更新緩存,等要查詢的時候才把最新的數(shù)據(jù)更新到緩存
數(shù)據(jù)庫與緩存雙寫情況下導(dǎo)致數(shù)據(jù)不一致問題
場景一
當(dāng)更新數(shù)據(jù)時,如更新某商品的庫存,當(dāng)前商品的庫存是100,現(xiàn)在要更新為99,先更新數(shù)據(jù)庫更改成99,然后刪除緩存,發(fā)現(xiàn)刪除緩存失敗了,這意味著數(shù)據(jù)庫存的是99,而緩存是100,這導(dǎo)致數(shù)據(jù)庫和緩存不一致。
場景一解決方案
這種情況應(yīng)該是先刪除緩存,然后在更新數(shù)據(jù)庫,如果刪除緩存失敗,那就不要更新數(shù)據(jù)庫,如果說刪除緩存成功,而更新數(shù)據(jù)庫失敗,那查詢的時候只是從數(shù)據(jù)庫里查了舊的數(shù)據(jù)而已,這樣就能保持?jǐn)?shù)據(jù)庫與緩存的一致性。
場景二
在高并發(fā)的情況下,如果當(dāng)刪除完緩存的時候,這時去更新數(shù)據(jù)庫,但還沒有更新完,另外一個請求來查詢數(shù)據(jù),發(fā)現(xiàn)緩存里沒有,就去數(shù)據(jù)庫里查,還是以上面商品庫存為例,如果數(shù)據(jù)庫中產(chǎn)品的庫存是100,那么查詢到的庫存是100,然后插入緩存,插入完緩存后,原來那個更新數(shù)據(jù)庫的線程把數(shù)據(jù)庫更新為了99,導(dǎo)致數(shù)據(jù)庫與緩存不一致的情況
場景二解決方案
遇到這種情況,可以用隊列的去解決這個問,創(chuàng)建幾個隊列,如20個,根據(jù)商品的ID去做hash值,然后對隊列個數(shù)取摸,當(dāng)有數(shù)據(jù)更新請求時,先把它丟到隊列里去,當(dāng)更新完后在從隊列里去除,如果在更新的過程中,遇到以上場景,先去緩存里看下有沒有數(shù)據(jù),如果沒有,可以先去隊列里看是否有相同商品ID在做更新,如果有也把查詢的請求發(fā)送到隊列里去,然后同步等待緩存更新完成。
這里有一個優(yōu)化點,如果發(fā)現(xiàn)隊列里有一個查詢請求了,那么就不要放新的查詢操作進(jìn)去了,用一個while(true)循環(huán)去查詢緩存,循環(huán)個200MS左右,如果緩存里還沒有則直接取數(shù)據(jù)庫的舊數(shù)據(jù),一般情況下是可以取到的。
在高并發(fā)下解決場景二要注意的問題
(1)讀請求時長阻塞
由于讀請求進(jìn)行了非常輕度的異步化,所以一定要注意讀超時的問題,每個讀請求必須在超時間內(nèi)返回,該解決方案最大的風(fēng)險在于可能數(shù)據(jù)更新很頻繁,導(dǎo)致隊列中擠壓了大量的更新操作在里面,然后讀請求會發(fā)生大量的超時,最后導(dǎo)致大量的請求直接走數(shù)據(jù)庫,像遇到這種情況,一般要做好足夠的壓力測試,如果壓力過大,需要根據(jù)實際情況添加機器。
(2)請求并發(fā)量過高
這里還是要做好壓力測試,多模擬真實場景,并發(fā)量在最高的時候QPS多少,扛不住就要多加機器,還有就是做好讀寫比例是多少
(3)多服務(wù)實例部署的請求路由
可能這個服務(wù)部署了多個實例,那么必須保證說,執(zhí)行數(shù)據(jù)更新操作,以及執(zhí)行緩存更新操作的請求,都通過nginx服務(wù)器路由到相同的服務(wù)實例上
(4)熱點商品的路由問題,導(dǎo)致請求的傾斜
某些商品的讀請求特別高,全部打到了相同的機器的相同丟列里了,可能造成某臺服務(wù)器壓力過大,因為只有在商品數(shù)據(jù)更新的時候才會清空緩存,然后才會導(dǎo)致讀寫并發(fā),所以更新頻率不是太高的話,這個問題的影響并不是很大,但是確實有可能某些服務(wù)器的負(fù)載會高一些。
數(shù)據(jù)庫與緩存數(shù)據(jù)一致性解決方案流程圖
數(shù)據(jù)庫與緩存數(shù)據(jù)一致性解決方案對應(yīng)代碼
商品庫存實體
package com.shux.inventory.entity;
/**
**********************************************
* 描述:
* Simba.Hua
* 2017年8月30日
**********************************************
**/
public class InventoryProduct {
private Integer productId;
private Long InventoryCnt;
public Integer getProductId() {
return productId;
}
public void setProductId(Integer productId) {
this.productId = productId;
}
public Long getInventoryCnt() {
return InventoryCnt;
}
public void setInventoryCnt(Long inventoryCnt) {
InventoryCnt = inventoryCnt;
}
}
請求接口
/**
**********************************************
* 描述:
* Simba.Hua
* 2017年8月27日
**********************************************
**/
public interface Request {
public void process();
public Integer getProductId();
public boolean isForceFefresh();
}
數(shù)據(jù)更新請求
package com.shux.inventory.request;
import org.springframework.transaction.annotation.Transactional;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
/**
**********************************************
* 描述:更新庫存信息
* 1、先刪除緩存中的數(shù)據(jù)
* 2、更新數(shù)據(jù)庫中的數(shù)據(jù)
* Simba.Hua
* 2017年8月30日
**********************************************
**/
public class InventoryUpdateDBRequest implements Request{
private InventoryProductBiz inventoryProductBiz;
private InventoryProduct inventoryProduct;
public InventoryUpdateDBRequest(InventoryProduct inventoryProduct,InventoryProductBiz inventoryProductBiz){
this.inventoryProduct = inventoryProduct;
this.inventoryProductBiz = inventoryProductBiz;
}
@Override
@Transactional
public void process() {
inventoryProductBiz.removeInventoryProductCache(inventoryProduct.getProductId());
inventoryProductBiz.updateInventoryProduct(inventoryProduct);
}
@Override
public Integer getProductId() {
// TODO Auto-generated method stub
return inventoryProduct.getProductId();
}
@Override
public boolean isForceFefresh() {
// TODO Auto-generated method stub
return false;
}
}
查詢請求
package com.shux.inventory.request;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
/**
**********************************************
* 描述:查詢緩存數(shù)據(jù)
* 1、從數(shù)據(jù)庫中查詢
* 2、從數(shù)據(jù)庫中查詢后插入到緩存中
* Simba.Hua
* 2017年8月30日
**********************************************
**/
public class InventoryQueryCacheRequest implements Request {
private InventoryProductBiz inventoryProductBiz;
private Integer productId;
private boolean isForceFefresh;
public InventoryQueryCacheRequest(Integer productId,InventoryProductBiz inventoryProductBiz,boolean isForceFefresh) {
this.productId = productId;
this.inventoryProductBiz = inventoryProductBiz;
this.isForceFefresh = isForceFefresh;
}
@Override
public void process() {
InventoryProduct inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);
inventoryProductBiz.setInventoryProductCache(inventoryProduct);
}
@Override
public Integer getProductId() {
// TODO Auto-generated method stub
return productId;
}
public boolean isForceFefresh() {
return isForceFefresh;
}
public void setForceFefresh(boolean isForceFefresh) {
this.isForceFefresh = isForceFefresh;
}
}
spring啟動時初始化隊列線程池
package com.shux.inventory.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.shux.inventory.request.Request;
import com.shux.inventory.request.RequestQueue;
import com.shux.utils.other.SysConfigUtil;
/**
**********************************************
* 描述:請求處理線程池,初始化隊列數(shù)及每個隊列最多能處理的數(shù)量
* Simba.Hua
* 2017年8月27日
**********************************************
**/
public class RequestProcessorThreadPool {
private static final int blockingQueueNum = SysConfigUtil.get("request.blockingqueue.number")==null?10:Integer.valueOf(SysConfigUtil.get("request.blockingqueue.number").toString());
private static final int queueDataNum = SysConfigUtil.get("request.everyqueue.data.length")==null?100:Integer.valueOf(SysConfigUtil.get("request.everyqueue.data.length").toString());
private ExecutorService threadPool = Executors.newFixedThreadPool(blockingQueueNum);
private RequestProcessorThreadPool(){
for(int i=0;iblockingQueueNum;i++){//初始化隊列
ArrayBlockingQueueRequest> queue = new ArrayBlockingQueueRequest>(queueDataNum);//每個隊列中放100條數(shù)據(jù)
RequestQueue.getInstance().addQueue(queue);
threadPool.submit(new RequestProcessorThread(queue));//把每個queue交個線程去處理,線程會處理每個queue中的數(shù)據(jù)
}
}
public static class Singleton{
private static RequestProcessorThreadPool instance;
static{
instance = new RequestProcessorThreadPool();
}
public static RequestProcessorThreadPool getInstance(){
return instance;
}
}
public static RequestProcessorThreadPool getInstance(){
return Singleton.getInstance();
}
/**
* 初始化線程池
*/
public static void init(){
getInstance();
}
}
請求處理線程
package com.shux.inventory.thread;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import com.shux.inventory.request.InventoryUpdateDBRequest;
import com.shux.inventory.request.Request;
import com.shux.inventory.request.RequestQueue;
/**
**********************************************
* 描述:請求處理線程
* Simba.Hua
* 2017年8月27日
**********************************************
**/
public class RequestProcessorThread implements CallableBoolean>{
private ArrayBlockingQueueRequest> queue;
public RequestProcessorThread(ArrayBlockingQueueRequest> queue){
this.queue = queue;
}
@Override
public Boolean call() throws Exception {
Request request = queue.take();
MapInteger,Boolean> flagMap = RequestQueue.getInstance().getFlagMap();
//不需要強制刷新的時候,查詢請求去重處理
if (!request.isForceFefresh()){
if (request instanceof InventoryUpdateDBRequest) {//如果是更新請求,那就置為false
flagMap.put(request.getProductId(), true);
} else {
Boolean flag = flagMap.get(request.getProductId());
/**
* 標(biāo)志位為空,有三種情況
* 1、沒有過更新請求
* 2、沒有查詢請求
* 3、數(shù)據(jù)庫中根本沒有數(shù)據(jù)
* 在最初情況,一旦庫存了插入了數(shù)據(jù),那就好會在緩存中也會放一份數(shù)據(jù),
* 但這種情況下有可能由于redis中內(nèi)存滿了,redis通過LRU算法把這個商品給清除了,導(dǎo)致緩存中沒有數(shù)據(jù)
* 所以當(dāng)標(biāo)志位為空的時候,需要從數(shù)據(jù)庫重查詢一次,并且把標(biāo)志位置為false,以便后面的請求能夠從緩存中取
*/
if ( flag == null) {
flagMap.put(request.getProductId(), false);
}
/**
* 如果不為空,并且flag為true,說明之前有一次更新請求,說明緩存中沒有數(shù)據(jù)了(更新緩存會先刪除緩存),
* 這個時候就要去刷新緩存,即從數(shù)據(jù)庫中查詢一次,并把標(biāo)志位設(shè)置為false
*/
if ( flag != null flag) {
flagMap.put(request.getProductId(), false);
}
/**
* 這種情況說明之前有一個查詢請求,并且把數(shù)據(jù)刷新到了緩存中,所以這時候就不用去刷新緩存了,直接返回就可以了
*/
if (flag != null !flag) {
flagMap.put(request.getProductId(), false);
return true;
}
}
}
request.process();
return true;
}
}
請求隊列
package com.shux.inventory.request;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
/**
**********************************************
* 描述:請求隊列
* Simba.Hua
* 2017年8月27日
**********************************************
**/
public class RequestQueue {
private ListArrayBlockingQueueRequest>> queues = new ArrayList>();
private MapInteger,Boolean> flagMap = new ConcurrentHashMap>();
private RequestQueue(){
}
private static class Singleton{
private static RequestQueue queue;
static{
queue = new RequestQueue();
}
public static RequestQueue getInstance() {
return queue;
}
}
public static RequestQueue getInstance(){
return Singleton.getInstance();
}
public void addQueue(ArrayBlockingQueueRequest> queue) {
queues.add(queue);
}
public int getQueueSize(){
return queues.size();
}
public ArrayBlockingQueueRequest> getQueueByIndex(int index) {
return queues.get(index);
}
public MapInteger,Boolean> getFlagMap() {
return this.flagMap;
}
}
spring 啟動初始化線程池類
package com.shux.inventory.listener;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import com.shux.inventory.thread.RequestProcessorThreadPool;
/**
**********************************************
* 描述:spring 啟動初始化線程池類
* Simba.Hua
* 2017年8月27日
**********************************************
**/
public class InitListener implements ApplicationListenerContextRefreshedEvent>{
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// TODO Auto-generated method stub
if(event.getApplicationContext().getParent() != null){
return;
}
RequestProcessorThreadPool.init();
}
}
異步處理請求接口
package com.shux.inventory.biz;
import com.shux.inventory.request.Request;
/**
**********************************************
* 描述:請求異步處理接口,用于路由隊列并把請求加入到隊列中
* Simba.Hua
* 2017年8月30日
**********************************************
**/
public interface IRequestAsyncProcessBiz {
void process(Request request);
}
異步處理請求接口實現(xiàn)
package com.shux.inventory.biz.impl;
import java.util.concurrent.ArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.shux.inventory.biz.IRequestAsyncProcessBiz;
import com.shux.inventory.request.Request;
import com.shux.inventory.request.RequestQueue;
/**
**********************************************
* 描述:異步處理請求,用于路由隊列并把請求加入到隊列中
* Simba.Hua
* 2017年8月30日
**********************************************
**/
@Service("requestAsyncProcessService")
public class RequestAsyncProcessBizImpl implements IRequestAsyncProcessBiz {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void process(Request request) {
// 做請求的路由,根據(jù)productId路由到對應(yīng)的隊列
ArrayBlockingQueueRequest> queue = getQueueByProductId(request.getProductId());
try {
queue.put(request);
} catch (InterruptedException e) {
logger.error("產(chǎn)品ID{}加入隊列失敗",request.getProductId(),e);
}
}
private ArrayBlockingQueueRequest> getQueueByProductId(Integer productId) {
RequestQueue requestQueue = RequestQueue.getInstance();
String key = String.valueOf(productId);
int hashcode;
int hash = (key == null) ? 0 : (hashcode = key.hashCode())^(hashcode >>> 16);
//對hashcode取摸
int index = (requestQueue.getQueueSize()-1) hash;
return requestQueue.getQueueByIndex(index);
}
}
package com.shux.inventory.biz.impl;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
import com.shux.inventory.mapper.InventoryProductMapper;
import com.shux.redis.biz.IRedisBiz;
/**
**********************************************
* 描述
* Simba.Hua
* 2017年8月30日
**********************************************
**/
@Service("inventoryProductBiz")
public class InventoryProductBizImpl implements InventoryProductBiz {
private @Autowired IRedisBizInventoryProduct> redisBiz;
private @Resource InventoryProductMapper mapper;
@Override
public void updateInventoryProduct(InventoryProduct inventoryProduct) {
// TODO Auto-generated method stub
mapper.updateInventoryProduct(inventoryProduct);
}
@Override
public InventoryProduct loadInventoryProductByProductId(Integer productId) {
// TODO Auto-generated method stub
return mapper.loadInventoryProductByProductId(productId);
}
@Override
public void setInventoryProductCache(InventoryProduct inventoryProduct) {
redisBiz.set("inventoryProduct:"+inventoryProduct.getProductId(), inventoryProduct);
}
@Override
public void removeInventoryProductCache(Integer productId) {
redisBiz.delete("inventoryProduct:"+productId);
}
@Override
public InventoryProduct loadInventoryProductCache(Integer productId) {
// TODO Auto-generated method stub
return redisBiz.get("inventoryProduct:"+productId);
}
}
數(shù)據(jù)更新請求controller
package com.shux.inventory.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.shux.inventory.biz.IRequestAsyncProcessBiz;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
import com.shux.inventory.request.InventoryUpdateDBRequest;
import com.shux.inventory.request.Request;
import com.shux.utils.other.Response;
/**
**********************************************
* 描述:提交更新請求
* Simba.Hua
* 2017年9月1日
**********************************************
**/
@Controller("/inventory")
public class InventoryUpdateDBController {
private @Autowired InventoryProductBiz inventoryProductBiz;
private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;
@RequestMapping("/updateDBInventoryProduct")
@ResponseBody
public Response updateDBInventoryProduct(InventoryProduct inventoryProduct){
Request request = new InventoryUpdateDBRequest(inventoryProduct,inventoryProductBiz);
requestAsyncProcessBiz.process(request);
return new Response(Response.SUCCESS,"更新成功");
}
}
數(shù)據(jù)查詢請求controller
package com.shux.inventory.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import com.shux.inventory.biz.IRequestAsyncProcessBiz;
import com.shux.inventory.biz.InventoryProductBiz;
import com.shux.inventory.entity.InventoryProduct;
import com.shux.inventory.request.InventoryQueryCacheRequest;
import com.shux.inventory.request.Request;
/**
**********************************************
* 描述:提交查詢請求
* 1、先從緩存中取數(shù)據(jù)
* 2、如果能從緩存中取到數(shù)據(jù),則返回
* 3、如果不能從緩存取到數(shù)據(jù),則等待20毫秒,然后再次去數(shù)據(jù),直到200毫秒,如果超過200毫秒還不能取到數(shù)據(jù),則從數(shù)據(jù)庫中取,并強制刷新緩存數(shù)據(jù)
* Simba.Hua
* 2017年9月1日
**********************************************
**/
@Controller("/inventory")
public class InventoryQueryCacheController {
private @Autowired InventoryProductBiz inventoryProductBiz;
private @Autowired IRequestAsyncProcessBiz requestAsyncProcessBiz;
@RequestMapping("/queryInventoryProduct")
public InventoryProduct queryInventoryProduct(Integer productId) {
Request request = new InventoryQueryCacheRequest(productId,inventoryProductBiz,false);
requestAsyncProcessBiz.process(request);//加入到隊列中
long startTime = System.currentTimeMillis();
long allTime = 0L;
long endTime = 0L;
InventoryProduct inventoryProduct = null;
while (true) {
if (allTime > 200){//如果超過了200ms,那就直接退出,然后從數(shù)據(jù)庫中查詢
break;
}
try {
inventoryProduct = inventoryProductBiz.loadInventoryProductCache(productId);
if (inventoryProduct != null) {
return inventoryProduct;
} else {
Thread.sleep(20);//如果查詢不到就等20毫秒
}
endTime = System.currentTimeMillis();
allTime = endTime - startTime;
} catch (Exception e) {
}
}
/**
* 代碼執(zhí)行到這來,只有以下三種情況
* 1、緩存中本來有數(shù)據(jù),由于redis內(nèi)存滿了,redis通過LRU算法清除了緩存,導(dǎo)致數(shù)據(jù)沒有了
* 2、由于之前數(shù)據(jù)庫查詢比較慢或者內(nèi)存太小處理不過來隊列中的數(shù)據(jù),導(dǎo)致隊列里擠壓了很多的數(shù)據(jù),所以一直沒有從數(shù)據(jù)庫中獲取數(shù)據(jù)然后插入到緩存中
* 3、數(shù)據(jù)庫中根本沒有這樣的數(shù)據(jù),這種情況叫數(shù)據(jù)穿透,一旦別人知道這個商品沒有,如果一直執(zhí)行查詢,就會一直查詢數(shù)據(jù)庫,如果過多,那么有可能會導(dǎo)致數(shù)據(jù)庫癱瘓
*/
inventoryProduct = inventoryProductBiz.loadInventoryProductByProductId(productId);
if (inventoryProduct != null) {
Request forcRrequest = new InventoryQueryCacheRequest(productId,inventoryProductBiz,true);
requestAsyncProcessBiz.process(forcRrequest);//這個時候需要強制刷新數(shù)據(jù)庫,使緩存中有數(shù)據(jù)
return inventoryProduct;
}
return null;
}
}
到此這篇關(guān)于詳解redis緩存與數(shù)據(jù)庫一致性問題解決的文章就介紹到這了,更多相關(guān)redis緩存與數(shù)據(jù)庫一致性內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- Redis緩存常用4種策略原理詳解
- 聊一聊Redis與MySQL雙寫一致性如何保證
- MySQL與Redis如何保證數(shù)據(jù)一致性詳解
- redis實現(xiàn)分布式的方法總結(jié)
- 面試常問:如何保證Redis緩存和數(shù)據(jù)庫的數(shù)據(jù)一致性