编程开发 购物 网址 游戏 小说 歌词 地图 快照 股票 美女 新闻 笑话 | 汉字 软件 日历 阅读 下载 图书馆 开发 租车 短信 China
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
移动开发 架构设计 编程语言 互联网 开发经验 Web前端 开发总结
开发杂谈 系统运维 研发管理 数据库 云 计 算 Java开发
VC(MFC) Delphi VB C++(C语言) C++ Builder 其它开发语言 云计算 Java开发 .Net开发 IOS开发 Android开发 PHP语言 JavaScript
ASP语言 HTML(CSS) HTML5 Apache MSSQL数据库 Oracle数据库 PowerBuilder Informatica 其它数据库 硬件及嵌入式开发 Linux开发资料
  编程开发知识库 -> 互联网 -> 单机任务重试机制——重启不丢任务 -> 正文阅读
 

[互联网]单机任务重试机制——重启不丢任务[第1页]

任务失败重试机制
某些场景下业务失败需要重试,比如说状态通知第三方,发奖品,发短信等等。总体的思路是将任任务现场记录下来,然后稍后重试。方案有多种
任务现场存储在db中,使用分布式任务调度统一去执行。 任务现场存储在文件中, 各自重试。
方案二的依赖少,而且能做到各个机器的隔离,但是没有HA。方案一具有HA,但实现与机器相关的业务场景比较难,比如某台机器缓存更新失败,需要稍后重试,而且引入的依赖会很多。考虑到机器的故障率不高,且为了尽量减少依赖,采用了方案二。
本地持久化队列实现任务失败重试
使用了Berkeley DB实现了本地持久化队列。 每个任务需要指定berkeleydb的存储路径,以及任务的名字。框架封装了底层的持久化队列,对外只暴露失败执行策略和失败执行的业务处理回调。使用的设计模式是组合+策略
RetryPolicy: 定义任务是否要执行,执行的频率。 这个可以复用 RetryCallBack: 定义任务重试时 实际要执行的业务逻辑。 例子

/**
 * Created by carey on 2017/9/20 11:14:46.
 */
public class ExampleRetryManager {

    /**
     * spring使用方式就用属性注入
     */
    private String taskSaveLocation = "/tmp/project/retryTask";

    /**
     * spring使用方式就用属性注入
     */
    private String taskDbName = "taskName";

    private RetryTemplate<ExampleTaskParam> retryTemplate;


    /**
     * 必须要调用的初始化方法
     * spring使用方式就用PostConstruct
     */
    public void init() {
        retryTemplate = new RetryTemplate<ExampleTaskParam>(
                new RetryCallBack<ExampleTaskParam>() {
                    @Override
                    public boolean doRetry(TaskContext<ExampleTaskParam> taskContext) {
                        ExampleTaskParam taskPacket = taskContext.getTaskParam();
                        return false;
                    }
                },
                new ExampleRetryPolicy<ExampleTaskParam>(),
                taskSaveLocation,
                "test");

        retryTemplate.init();
    }


    /**
     * 增加重试任务
     * @param taskContext
     */
    public void addRetry(TaskContext<ExampleTaskParam> taskContext) {
        retryTemplate.addTask(taskContext);
    }

}


/**
 * 注意数组越界错误
 * @param <T>
 */
class ExampleRetryPolicy<T extends Serializable> implements RetryPolicy<T> {

    private static int[] RETRY_DELAY_MINS = {1, 1, 1, 1, 1, 1, 1, 1};

    @Override
    public boolean isValid(TaskContext<T> taskContext) {
        return taskContext != null && taskContext.getCurExecuteCnt() < RETRY_DELAY_MINS.length - 1;
    }

    @Override
    public boolean isReady(TaskContext<T> taskContext) {
        return taskContext.getNextExecuteTime() < new Date().getTime();
    }

    @Override
    public long getNextExecuteTime(TaskContext<T> taskContext) {
        long timeDelay = RETRY_DELAY_MINS[taskContext.getCurExecuteCnt()] * 1000 * 60;
        return new Date().getTime() + timeDelay;
    }
}

任务对象参数

/**
 *
 * 必须要实现序列化接口
 * Created by carey on 2017/9/20 11:15:01.
 */
public class ExampleTaskParam implements Serializable {
    private static final long serialVersionUID = -8624662207361962314L;

    private int a;
    private String b;

    @Override
    public String toString() {
        return "TestRetryDo{" +
                "a=" + a +
                ", b='" + b + '\'' +
                '}';
    }

    public int getA() {
        return a;
    }

    public void setA(int a) {
        this.a = a;
    }

    public String getB() {
        return b;
    }

    public void setB(String b) {
        this.b = b;
    }
}

注意事项
任务参数必须要实现序列化接口,这个框架已经实现 任务参数对象在放入队列后,再去修改参数,是不会修改到队列中的参数(这个时候就已经序列化存储到文件中) 不同的任务会使用不同的任务参数,在构造任务管理类的时候需要保证不同任务使用不同的任务名字,否则会出现castException.(后面框架会保证同一个env环境路径下的相同的任务名称只能对应一个taskParam的类型) 底层实现 对于Berkeley DB的管理,由于一个berkeley DB的一个env环境可以管理多个db,在关闭db的时候通常需要关闭db。在一个env管理db1和db2时,如果当db1关闭时,同时关闭了env,那么db2也将无法使用,所以提供了一个工厂类统一管理env和db。在所有的db都关闭时,才会关闭env。 将重试任务管理成一个队列,使用berkeleyDB mapview的特性
具体的代码:env包装类

    /**
     * 单例工厂模式  同一个env下可以可以管理多个db
     * 避免同一个路径下同时创建多个env
     * Author: yuncong
     * Author: carey
     * Created at: 2012-8-6 13:22:12
     */
    public class BdbEnvironment {
        private static final String CLASS_CATALOG = "java_class_catalog";

        private Environment environment;
        //这个db单独存储类型信息,通过javaCataLog暴露出去给外部
        private Database catalogDb;
        private StoredClassCatalog javaCatalog;

        private static Map<String, BdbEnvironment> envs = new ConcurrentHashMap<>();


        public BdbEnvironment(String envHome)
                throws DatabaseException {
            File home = new File(envHome);
            if (!home.exists()) {
                boolean success = home.mkdirs();
                if (!success) {
                    throw new RuntimeException();
                }
            }

            try {
                EnvironmentConfig environmentConfig = new EnvironmentConfig();
                environmentConfig.setTransactional(true);
                environmentConfig.setAllowCreate(true);

                environmentConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);

                environment = new Environment(home, environmentConfig);

                DatabaseConfig dbConfig = new DatabaseConfig();
                dbConfig.setTransactional(true);
                dbConfig.setAllowCreate(true);

                catalogDb = environment.openDatabase(null, CLASS_CATALOG, dbConfig);
                javaCatalog = new StoredClassCatalog(catalogDb);

            } catch (DatabaseException dbe) {
                throw new RuntimeException();
            }
        }


        /**
         * 关闭单个环境
         */
        public void close() {
            if (catalogDb != null) catalogDb.close();
            if (javaCatalog != null) javaCatalog.close();
            if (environment != null) environment.close();
        }

        public Environment getEnvironment() {
            return environment;
        }

        public StoredClassCatalog getJavaCatalog() {
            return javaCatalog;
        }
    }

任务队列抽象

    /**
     * 基于 Berkeley DB 的持久化队列 并非是标准化的java队列
     * 1. 不允许直接清空
     * 
     * Created by carey on 2017/9/8.
     */
    public class BdbQueue<E extends Serializable> extends AbstractQueue<E> {

        private BdbEnvironment bdbEnvironment;

        private Database queueDb;             // 数据库,用于保存值,使得支持队列持久化,无需序列化

        /**
         * Thread safe?
         */
        private StoredSortedMap<Long, E> queueMap;   //持久化Map,Key为指针位置,Value为值,

        private TransactionRunner transactionRunner;

        /**
         * 队列的名字,不同队列的名字就相当于不同的表
         */
        private String queueName = "";


        private Object syncObj = new Object();


        private static final String MESSAGE_STORE = "message_store";


        /**
         * 构造方法
         * @param environment bdb环境配置
         * @param dbName  db的名字  一个db对应一个queue
         */
        BdbQueue(BdbEnvironment environment, String dbName) {
            DatabaseConfig dbConfig = new DatabaseConfig();
            dbConfig.setTransactional(true);
            dbConfig.setAllowCreate(true);

            this.bdbEnvironment = environment;
            this.queueName = dbName;
            queueDb = bdbEnvironment.getEnvironment().openDatabase(null, MESSAGE_STORE + this.getQueueName(), dbConfig);

            EntryBinding messageKeyBinding = new SerialBinding<Long>(bdbEnvironment.getJavaCatalog(), Long.class);
            EntryBinding messageValueBinding = new SerialBinding<Object>(bdbEnvironment.getJavaCatalog(), Object.class);

            queueMap = new StoredSortedMap(queueDb, messageKeyBinding, messageValueBinding, true);
            transactionRunner = new TransactionRunner(bdbEnvironment.getEnvironment());
        }


        /**
         *
         */
        public void close() {
            if (queueDb != null)
                queueDb.close();
        }


        @Override
        public Iterator<E> iterator() {
            return queueMap.values().iterator();
        }

        @Override
        public int size() {
            return queueMap.size();
        }

        @Override
        public boolean offer(E e) {
            if (null == e) {
                throw new IllegalArgumentException("原始不允许为空");
            }
            synchronized (syncObj) {
                Long lastKey = queueMap.lastKey();
                lastKey = (lastKey == null) ? 1L : lastKey + 1L;
                queueMap.put(lastKey, e);
            }
            return true;
        }


        @Override
        public E poll() {
            synchronized (syncObj) {
                Long firstKey;
                E val;
                if ((firstKey = queueMap.firstKey()) == null || (val = queueMap.get(firstKey)) == null) {
                    return null;
                }
                return queueMap.remove(firstKey);
            }
        }

        @Override
        public E peek() {
            synchronized (syncObj) {
                Long firstKey;
                E val;
                if ((firstKey = queueMap.firstKey()) == null || (val = queueMap.get(firstKey)) == null) {
                    return null;
                }
                return val;
            }
        }

        @Override
        public void clear() {
            throw new UnsupportedOperationException();
        }

        public Database getQueueDb() {
            return queueDb;
        }


        public StoredSortedMap<Long, E> getQueueMap() {
            return queueMap;
        }


        public String getQueueName() {
            return queueName;
        }

        public BdbEnvironment getBdbEnvironment() {
            return bdbEnvironment;
        }

        public TransactionRunner getTransactionRunner() {
            return transactionRunner;
    }
}   

任务队列工厂

/**
 * Created by carey on 2017/9/19 15:17:46.
 */
public class BdbQueueFactory {

    private static ConcurrentHashMap<String/*envPath*/, BdbEnvironment> bdbEnvironments = new ConcurrentHashMap<>();

    private static ConcurrentHashMap<String/*envPath*/, Map<String/*dbName*/, BdbQueue>> env2QueuesMap = new ConcurrentHashMap<>();


    private static Object staticSyncObj = new Object();


    private static Logger logger = LoggerFactory.getLogger(BdbQueueFactory.class);

    static {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                closeAll();
            }
        });
    }



    public static  BdbQueue getQueue(String envPath, String queueName) {
        Map<String/*queueName*/, BdbQueue> queueMap = env2QueuesMap.get(envPath);
        if (null != queueMap) {
            BdbQueue queue = queueMap.get(queueName);
            if (null != queue) {
                return queue;
            }
        }

        synchronized (staticSyncObj) {
            queueMap = env2QueuesMap.get(envPath);
            if (null != queueMap) {
                BdbQueue queue = queueMap.get(queueName);
                if (null != queue) {
                    return queue;
                }
            }
            //如果环境的路径的共用
            BdbEnvironment bdbEnvironment = bdbEnvironments.get(envPath);
            if (bdbEnvironment == null) {
                bdbEnvironment = new BdbEnvironment(envPath);
                bdbEnvironments.put(envPath, bdbEnvironment);
                env2QueuesMap.put(envPath, new ConcurrentHashMap<String, BdbQueue>());
            }
            BdbQueue queue =  new BdbQueue(bdbEnvironment, queueName);
            env2QueuesMap.get(envPath).put(queueName, queue);
            return queue;
        }

    }


    /**
     * 清理工作
     */
    private static void closeAll() {
        for (Map.Entry<String/*envPath*/, Map<String/*dbName*/, BdbQueue>> queueMapEntry : env2QueuesMap.entrySet()) {
            Map<String, BdbQueue> queueMap = queueMapEntry.getValue();
            for (Map.Entry<String, BdbQueue> tmp : queueMap.entrySet()) {
                String queueName = tmp.getKey();
                BdbQueue queue = tmp.getValue();
                try {
                    queue.close();
                    logger.info("queue:{} close success", queueName);
                } catch (Exception e) {
                    logger.error("queue:{} close error ", queueName, e);
                }
            }
            String envPath = queueMapEntry.getKey();
            BdbEnvironment environment = bdbEnvironments.get(envPath);
            try{
                environment.close();
                logger.info("environment path:{} close success", envPath);
            }catch(Exception e) {
                logger.error("environment path:{} close error", envPath, e);
            }
        }

    }
}           

真正的任务队列抽象

/**
 * 在没有任务的情况下一直自旋,等待任务入队的唤醒
 * Created by carey on 2017/9/13.
 */
public class RetryTemplate<T extends Serializable> {


    private BdbQueue<TaskContext<T>> taskQueue;

    private volatile boolean running = true;

    private AtomicBoolean isInited = new AtomicBoolean(false);

    private final RetryCallBack<T> retryCallBack;

    private final RetryPolicy<T> retryPolicy;

    private final String envPath;
    private final String queueName;


    private Object syncObj = new Object();

    protected Logger logger = LoggerFactory.getLogger(this.getClass());

    public RetryTemplate(RetryCallBack<T> retryCallBack,
                         RetryPolicy<T> retryPolicy,
                         String envPath,
                         String queueName) {
        if (StringUtils.isEmpty(envPath) || StringUtils.isEmpty(queueName)) {
            throw new IllegalArgumentException("envPath or queueName should not be null");
        }
        if (null == retryCallBack || null == retryPolicy) {
            throw new IllegalArgumentException("retryCallBack or retryCallBack should not be null");
        }
        this.retryPolicy = retryPolicy;
        this.retryCallBack = retryCallBack;
        this.envPath = envPath;
        this.queueName = queueName;
    }


    /**
     * 初始化方法,子类必须要调用
     *
     */
    public void init() {
        if (!isInited.compareAndSet(false, true)) {
            logger.info("{} has already inited", this.getClass());
            return;
        }

        taskQueue = BdbQueueFactory.getQueue(envPath, queueName);
        Thread retryThread = new Thread(new RetryWorker());
        retryThread.setDaemon(true);
        retryThread.setName(queueName);
        retryThread.start();

        registerShutdownHook();
    }

    private void registerShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                running = false;
            }
        });
    }


    private class RetryWorker implements Runnable {
        @Override
        public void run() {
            while (running) {
                {
                    sleep(500);
                    TaskContext<T> taskContext = waitUntilGetTask();
                    if(!retryPolicy.isValid(taskContext)) {
                        logger.info("taskContext:{} is inValid, ignore", taskContext );
                        continue;
                    }

                    // 执行时间还没到,再次放入队列
                    if (!retryPolicy.isReady(taskContext)) {
                        addTask(taskContext);
                        continue;
                    }

                    try {
                        boolean isRetrySuc = retryCallBack.doRetry(taskContext);
                        logger.info("执行任务,结果:{} 任务现场:{}", isRetrySuc, taskContext);
                        if (!isRetrySuc) {
                            tryAgainWhenFailed(taskContext);
                            logger.info("任务执行失败,重新加入队列,任务现场为:{}", taskContext);
                        }
                    } catch (Exception e) {
                        logger.error("执行重试任务失败, 任务现场:{}", taskContext, e);
                        tryAgainWhenFailed(taskContext);
                        logger.info("任务执行异常,重新加入队列,任务现场为:{}", taskContext);
                    }
                }
            }
        }
    }

    /**
     * 失败时重试
     * 1. 更新任务现场
     * 2. 重新入队
     *
     * @param taskContext
     */
    private void tryAgainWhenFailed(TaskContext<T> taskContext) {
        long nextExecuteTime = retryPolicy.getNextExecuteTime(taskContext);
        taskContext.setNextExecuteTime(nextExecuteTime);
        taskContext.setCurExecuteCnt(taskContext.getCurExecuteCnt() + 1);
        addTask(taskContext);
    }


    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            logger.error("中断异常", e);
        }
    }

    /**
     * 自旋 直到获取到任务
     *
     * @return
     */
    private TaskContext<T> waitUntilGetTask() {
        TaskContext<T> taskPacket;
        synchronized (syncObj) {
            while ((taskPacket = taskQueue.poll()) == null) {
                try {
                    syncObj.wait();
                } catch (InterruptedException e) {
                    logger.error("中断异常", e);
                }
            }
        }
        return taskPacket;
    }

    /**
     * 增加任务
     *
     * @param taskPacket
     */
    public void addTask(TaskContext<T> taskPacket) {
        if (null == taskPacket) {
            throw new IllegalArgumentException("taskPacket should not be null");
        }
        synchronized (syncObj) {
            taskQueue.offer(taskPacket);
            syncObj.notify();
        }
    }
}

任务现场定义

/**
 * 任务现场描述
 * Created by carey on 2017/9/15 17:45:53.
 */
public class TaskContext<T extends Serializable> implements Serializable {
    private static final long serialVersionUID = -5568412953853536289L;

    /**
     * 期望的下次执行时间,不到期望时间不会执行
     * 不保证到了执行时间立即执行
     */
    private long nextExecuteTime;


    /**
     * 当前被执行的次数
     */
    private int curExecuteCnt;

    /**
     * 任务的具体参数
     */
    private T taskParam;


    public long getNextExecuteTime() {
        return nextExecuteTime;
    }

    public void setNextExecuteTime(long nextExecuteTime) {
        this.nextExecuteTime = nextExecuteTime;
    }

    public int getCurExecuteCnt() {
        return curExecuteCnt;
    }

    public void setCurExecuteCnt(int curExecuteCnt) {
        this.curExecuteCnt = curExecuteCnt;
    }

    public T getTaskParam() {
        return taskParam;
    }

    public void setTaskParam(T taskParam) {
        this.taskParam = taskParam;
    }

    @Override
    public String toString() {
        return "TaskContext{" +
                "nextExecuteTime=" + nextExecuteTime +
                ", curExecuteCnt=" + curExecuteCnt +
                ", taskParam=" + taskParam +
                '}';
    }
}

回调重试定义

/**
 * 回调重试
 * Created by carey on 2017/9/19 17:14:00.
 */
public interface RetryCallBack<T extends Serializable> {

    boolean doRetry(TaskContext<T> taskParam);
}

回调重试策略定义

/**
 * Created by carey on 2017/9/19 17:16:02.
 */
public interface RetryPolicy<T extends Serializable> {

    /**
     * 任务是否还有效,如果无效 直接丢弃
     * 请注意判空
     * @param taskContext
     * @return
     */
    boolean isValid(TaskContext<T> taskContext);

    /**
     * 是否到了执行时间
     * @param taskContext
     * @return
     */
    boolean isReady(TaskContext<T> taskContext);

    /**
     * 失败的时候更新下一次的执行时间
     *
     * @param taskContext
     * @return
     */
    long getNextExecuteTime(TaskContext<T> taskContext);

    /**
     * 默认的降幂策略
     * @param <T>
     */
    class DefaultDescending<T extends Serializable> implements RetryPolicy<T> {

        private static int[] RETRY_DELAY_MINS = {1, 2, 4, 8, 16, 32, 64, 128};

        @Override
        public boolean isValid(TaskContext<T> taskContext) {
            return taskContext != null && taskContext.getCurExecuteCnt() < RETRY_DELAY_MINS.length - 1;
        }

        @Override
        public boolean isReady(TaskContext<T> taskContext) {
            return taskContext.getNextExecuteTime() < new Date().getTime();

        }

        @Override
        public long getNextExecuteTime(TaskContext<T> taskContext) {
            long timeDelay =  RETRY_DELAY_MINS[taskContext.getCurExecuteCnt()] * 1000 * 60;
            return new Date().getTime() + timeDelay;
        }
    }
}

需要的依赖

<dependency>
    <groupId>com.sleepycat</groupId>
    <artifactId>je</artifactId>
    <version>5.0.73</version>
</dependency>

最后发一波广告
曹操专车招java开发,技术专家,架构师and so on..
坐标: 杭州
内推地址: yu.teng1@geely.com
  互联网 最新文章
Stanford 英文词性标注(Part-of-speech)缩
基于窗口的实时统计
求解矩阵最短路径问题
SSL握手通信详解及linux下c/c++ SSL Socket
关于服务器上(Docker中)运行Java程序时区
python爬虫系列(六):强大的beautifulsou
[计算机网络笔记]第四部分——网络层 选路算
11.28 北京,念腾讯暑假,不思则惘吧!
web安全之
滑块验证码识别 java版本
上一篇文章      下一篇文章      查看所有文章
加:2017-09-28 16:13:45  更:2017-09-28 16:13:49 
VC(MFC) Delphi VB C++(C语言) C++ Builder 其它开发语言 云计算 Java开发 .Net开发 IOS开发 Android开发 PHP语言 JavaScript
ASP语言 HTML(CSS) HTML5 Apache MSSQL数据库 Oracle数据库 PowerBuilder Informatica 其它数据库 硬件及嵌入式开发 Linux开发资料
360图书馆 软件开发资料 文字转语音 购物精选 软件下载 美食菜谱 新闻资讯 电影视频 小游戏 Chinese Culture 股票 租车
生肖星座 三丰软件 视频 开发 短信 中国文化 网文精选 搜图网 美图 阅读网 多播 租车 短信 看图 日历 万年历 2018年6日历
2018-6-21 8:59:16
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  编程开发知识库