package cn.mzhong.janytask.zookeeper;

import cn.mzhong.janytask.core.TaskContext;
import cn.mzhong.janytask.queue.LockedMessageDao;
import cn.mzhong.janytask.queue.Message;
import cn.mzhong.janytask.queue.QueueInfo;
import java.util.LinkedList;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/mzhong/janytask/zookeeper/ZookeeperMessageDao.class */
public class ZookeeperMessageDao extends LockedMessageDao {
    static final Logger Log = LoggerFactory.getLogger(ZookeeperMessageDao.class);
    protected String connectString;
    protected ZookeeperClient zkClient;
    protected String waitPath;
    protected String donePath;
    protected String errorPath;
    protected String lockPath;
    protected String rootPath;

    public void initZookeeperClient(String str) {
        this.connectString = str;
        this.zkClient = new ZookeeperClient(str);
    }

    public void initParentPath() {
        String str = this.rootPath + "/" + this.queueInfo.ID();
        this.waitPath = str + "/wait";
        this.donePath = str + "/done";
        this.errorPath = str + "/error";
        this.lockPath = str + "/lock";
        this.zkClient.create(this.waitPath, null, CreateMode.PERSISTENT);
        this.zkClient.create(this.donePath, null, CreateMode.PERSISTENT);
        this.zkClient.create(this.errorPath, null, CreateMode.PERSISTENT);
        this.zkClient.create(this.lockPath, null, CreateMode.PERSISTENT);
    }

    public void initRootPath(String str) {
        this.rootPath = str.startsWith("/") ? str : "/" + str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZookeeperMessageDao(TaskContext taskContext, QueueInfo queueInfo, String str, String str2) {
        super(taskContext, queueInfo);
        initZookeeperClient(str);
        initRootPath(str2);
        initParentPath();
    }

    protected void push(String str, Message message) {
        try {
            this.zkClient.create(str + "/" + message.getId(), this.dataSerializer.serialize(message), CreateMode.PERSISTENT);
        } catch (Exception e) {
            throw new RuntimeException("推送消息出错！", e);
        }
    }

    protected void delete(String str, Message message) {
        this.zkClient.delete(str + "/" + message.getId());
    }

    @Override // cn.mzhong.janytask.queue.MessageDao
    public void push(Message message) {
        push(this.waitPath, message);
    }

    @Override // cn.mzhong.janytask.queue.MessageDao
    public void done(Message message) {
        push(this.donePath, message);
        delete(this.waitPath, message);
        unLock(message.getId());
    }

    @Override // cn.mzhong.janytask.queue.MessageDao
    public void error(Message message) {
        push(this.errorPath, message);
        delete(this.waitPath, message);
        unLock(message.getId());
    }

    @Override // cn.mzhong.janytask.queue.MessageDao
    public long length() {
        return this.zkClient.getChildren(this.waitPath).size();
    }

    @Override // cn.mzhong.janytask.queue.LockedMessageDao
    protected boolean lock(String str) {
        return this.zkClient.create(this.lockPath + "/" + str, null, CreateMode.EPHEMERAL);
    }

    @Override // cn.mzhong.janytask.queue.LockedMessageDao
    protected boolean unLock(String str) {
        this.zkClient.delete(this.lockPath + "/" + str);
        return true;
    }

    @Override // cn.mzhong.janytask.queue.LockedMessageDao
    protected Message get(String str) {
        try {
            return (Message) this.dataSerializer.deserialize(this.zkClient.getData(this.waitPath + "/" + str));
        } catch (Exception e) {
            Log.error("消息反序列化出错，消息已被忽略！消息ID:" + str, e);
            return null;
        }
    }

    @Override // cn.mzhong.janytask.queue.LockedMessageDao
    protected LinkedList<String> queueIdList() {
        return new LinkedList<>(this.zkClient.getChildren(this.waitPath));
    }
}
