package com.alibaba.ververica.connectors.redis.sink;

import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.metrics.SimpleGauge;
import java.util.HashMap;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.StringUtils;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/* loaded from: input_file:com/alibaba/ververica/connectors/redis/sink/RedisUpsertSinkFunction.class */
public class RedisUpsertSinkFunction extends RedisBaseSinkFunction<RowData> {
    private final String mode;
    private RedisDataType redisDataType;
    private RedisDataSinkHandler handler;
    private Meter outTps;
    private Meter outBps;
    private SimpleGauge latencyGauge;
    private final boolean ignoreDelete;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.alibaba.ververica.connectors.redis.sink.RedisUpsertSinkFunction$1, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/ververica/connectors/redis/sink/RedisUpsertSinkFunction$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$ververica$connectors$redis$sink$RedisDataType = new int[RedisDataType.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$ververica$connectors$redis$sink$RedisDataType[RedisDataType.STRING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$ververica$connectors$redis$sink$RedisDataType[RedisDataType.LIST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$ververica$connectors$redis$sink$RedisDataType[RedisDataType.SET.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$alibaba$ververica$connectors$redis$sink$RedisDataType[RedisDataType.SORTEDSET.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$alibaba$ververica$connectors$redis$sink$RedisDataType[RedisDataType.HASHMAP.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/ververica/connectors/redis/sink/RedisUpsertSinkFunction$RedisDataSinkHandler.class */
    public interface RedisDataSinkHandler {
        void handle(RowData rowData, JedisCommands jedisCommands);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/ververica/connectors/redis/sink/RedisUpsertSinkFunction$RedisHashMapSinkHandler.class */
    public class RedisHashMapSinkHandler implements RedisDataSinkHandler {
        RedisHashMapSinkHandler() {
        }

        @Override // com.alibaba.ververica.connectors.redis.sink.RedisUpsertSinkFunction.RedisDataSinkHandler
        public void handle(RowData rowData, JedisCommands jedisCommands) {
            RowKind rowKind = rowData.getRowKind();
            RedisUpsertSinkFunction.this.outBps.markEvent(rowData.getString(0).toString().getBytes().length + rowData.getString(1).toString().getBytes().length + rowData.getString(2).toString().getBytes().length);
            if (rowKind.equals(RowKind.INSERT) || rowKind.equals(RowKind.UPDATE_AFTER)) {
                HashMap hashMap = new HashMap();
                hashMap.put(rowData.getString(1).toString(), rowData.getString(2).toString());
                jedisCommands.hmset(rowData.getString(0).toString(), hashMap);
            } else if ((rowKind.equals(RowKind.DELETE) || rowKind.equals(RowKind.UPDATE_BEFORE)) && !RedisUpsertSinkFunction.this.ignoreDelete) {
                jedisCommands.hdel(rowData.getString(0).toString(), rowData.getString(1).toString());
            } else {
                RedisBaseSinkFunction.LOG.info("Ignore rowdata {}.", rowData);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/ververica/connectors/redis/sink/RedisUpsertSinkFunction$RedisListSinkHandler.class */
    public class RedisListSinkHandler implements RedisDataSinkHandler {
        RedisListSinkHandler() {
        }

        @Override // com.alibaba.ververica.connectors.redis.sink.RedisUpsertSinkFunction.RedisDataSinkHandler
        public void handle(RowData rowData, JedisCommands jedisCommands) {
            RowKind rowKind = rowData.getRowKind();
            RedisUpsertSinkFunction.this.outBps.markEvent(rowData.getString(0).toString().getBytes().length + rowData.getString(1).toString().getBytes().length);
            if (rowKind.equals(RowKind.INSERT) || rowKind.equals(RowKind.UPDATE_AFTER)) {
                jedisCommands.lpush(rowData.getString(0).toString(), rowData.getString(1).toString());
            } else if ((rowKind.equals(RowKind.DELETE) || rowKind.equals(RowKind.UPDATE_BEFORE)) && !RedisUpsertSinkFunction.this.ignoreDelete) {
                jedisCommands.lrem(rowData.getString(0).toString(), 1L, rowData.getString(1).toString());
            } else {
                RedisBaseSinkFunction.LOG.info("Ignore rowdata {}.", rowData);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/ververica/connectors/redis/sink/RedisUpsertSinkFunction$RedisSetSinkHandler.class */
    public class RedisSetSinkHandler implements RedisDataSinkHandler {
        RedisSetSinkHandler() {
        }

        @Override // com.alibaba.ververica.connectors.redis.sink.RedisUpsertSinkFunction.RedisDataSinkHandler
        public void handle(RowData rowData, JedisCommands jedisCommands) {
            RowKind rowKind = rowData.getRowKind();
            RedisUpsertSinkFunction.this.outBps.markEvent(rowData.getString(0).toString().getBytes().length + rowData.getString(1).toString().getBytes().length);
            if (rowKind.equals(RowKind.INSERT) || rowKind.equals(RowKind.UPDATE_AFTER)) {
                jedisCommands.sadd(rowData.getString(0).toString(), rowData.getString(1).toString());
            } else if ((rowKind.equals(RowKind.DELETE) || rowKind.equals(RowKind.UPDATE_BEFORE)) && !RedisUpsertSinkFunction.this.ignoreDelete) {
                jedisCommands.srem(rowData.getString(0).toString(), rowData.getString(1).toString());
            } else {
                RedisBaseSinkFunction.LOG.info("Ignore rowdata {}.", rowData);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/ververica/connectors/redis/sink/RedisUpsertSinkFunction$RedisSortedSetSinkHandler.class */
    public class RedisSortedSetSinkHandler implements RedisDataSinkHandler {
        RedisSortedSetSinkHandler() {
        }

        @Override // com.alibaba.ververica.connectors.redis.sink.RedisUpsertSinkFunction.RedisDataSinkHandler
        public void handle(RowData rowData, JedisCommands jedisCommands) {
            RowKind rowKind = rowData.getRowKind();
            RedisUpsertSinkFunction.this.outBps.markEvent(rowData.getString(0).toString().getBytes().length + 8 + rowData.getString(2).toString().getBytes().length);
            if (rowKind.equals(RowKind.INSERT) || rowKind.equals(RowKind.UPDATE_AFTER)) {
                jedisCommands.zadd(rowData.getString(0).toString(), rowData.getDouble(1), rowData.getString(2).toString());
            } else if ((rowKind.equals(RowKind.DELETE) || rowKind.equals(RowKind.UPDATE_BEFORE)) && !RedisUpsertSinkFunction.this.ignoreDelete) {
                jedisCommands.zrem(rowData.getString(0).toString(), rowData.getString(2).toString());
            } else {
                RedisBaseSinkFunction.LOG.info("Ignore rowdata {}.", rowData);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/ververica/connectors/redis/sink/RedisUpsertSinkFunction$RedisStringSinkHandler.class */
    public class RedisStringSinkHandler implements RedisDataSinkHandler {
        RedisStringSinkHandler() {
        }

        @Override // com.alibaba.ververica.connectors.redis.sink.RedisUpsertSinkFunction.RedisDataSinkHandler
        public void handle(RowData rowData, JedisCommands jedisCommands) {
            RowKind rowKind = rowData.getRowKind();
            RedisUpsertSinkFunction.this.outBps.markEvent(rowData.getString(0).toString().getBytes().length);
            if (rowKind.equals(RowKind.INSERT) || rowKind.equals(RowKind.UPDATE_AFTER)) {
                jedisCommands.set(rowData.getString(0).toString(), rowData.getString(1).toString());
            } else if ((rowKind.equals(RowKind.DELETE) || rowKind.equals(RowKind.UPDATE_BEFORE)) && !RedisUpsertSinkFunction.this.ignoreDelete) {
                jedisCommands.del(rowData.getString(0).toString());
            } else {
                RedisBaseSinkFunction.LOG.info("Ignore rowdata {}.", rowData);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RedisUpsertSinkFunction(String str, int i, boolean z, String str2, int i2, String str3, boolean z2) {
        super(str, i, z, str2, i2);
        this.mode = str3;
        this.ignoreDelete = z2;
    }

    @Override // com.alibaba.ververica.connectors.redis.sink.RedisBaseSinkFunction
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        initHandler();
        this.outTps = MetricUtils.registerNumRecordsOutRate(getRuntimeContext());
        this.outBps = MetricUtils.registerNumBytesOutRate(getRuntimeContext(), "redis");
        this.latencyGauge = MetricUtils.registerCurrentSendTime(getRuntimeContext());
    }

    private void initHandler() {
        this.redisDataType = RedisDataType.valueOf(this.mode.toUpperCase());
        switch (AnonymousClass1.$SwitchMap$com$alibaba$ververica$connectors$redis$sink$RedisDataType[this.redisDataType.ordinal()]) {
            case 1:
                this.handler = new RedisStringSinkHandler();
                return;
            case 2:
                this.handler = new RedisListSinkHandler();
                return;
            case 3:
                this.handler = new RedisSetSinkHandler();
                return;
            case DateUtils.RANGE_WEEK_CENTER /* 4 */:
                this.handler = new RedisSortedSetSinkHandler();
                return;
            case DateUtils.RANGE_MONTH_SUNDAY /* 5 */:
                this.handler = new RedisHashMapSinkHandler();
                return;
            default:
                throw new RuntimeException("Not a supported RedisDataType: " + this.mode);
        }
    }

    @Override // com.alibaba.ververica.connectors.redis.sink.RedisBaseSinkFunction
    protected JedisPool createJedisPool(Configuration configuration) {
        return new JedisPool(new JedisPoolConfig(), this.host, this.port, 3000, StringUtils.isNullOrWhitespaceOnly(this.password) ? null : this.password, this.dataBaseNum);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.alibaba.ververica.connectors.redis.sink.RedisBaseSinkFunction
    public void invoke(JedisCommands jedisCommands, RowData rowData, SinkFunction.Context context) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        this.handler.handle(rowData, jedisCommands);
        this.outTps.markEvent();
        this.latencyGauge.report(System.currentTimeMillis() - currentTimeMillis, 1L);
    }
}
