package org.apache.iot.flink.source;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.CommonRequest;
import com.aliyuncs.DefaultAcsClient;
import io.opentracing.log.Fields;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.naming.InitialContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.RowData;
import org.apache.iot.flink.IotLinkPlatform;
import org.apache.iot.flink.amqp.AmqpClient;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iot/flink/source/IotSourceFunction.class */
public class IotSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger("test");
    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");
    private static final ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 3, TimeUnit.SECONDS, new LinkedBlockingQueue(50000));
    private static final AtomicLong count = new AtomicLong(0);
    private final String regionId;
    private final String accessId;
    private final String accessKey;
    private final String tableName;
    private final String iotInstanceId;
    private String consumerGroupId;
    private final String clientId;
    private final String uid;
    private final Integer connectionCount;
    private final String[] fieldNames;
    private final IotDeserializer deserializationSchema;
    private volatile boolean isRunning = true;
    private Long startTime = Long.valueOf(System.currentTimeMillis());

    public IotSourceFunction(String str, String str2, String str3, String str4, String str5, String str6, String str7, String str8, Integer num, String[] strArr, IotDeserializer iotDeserializer) {
        this.regionId = str;
        this.accessId = str2;
        this.accessKey = str3;
        this.tableName = str4;
        this.iotInstanceId = str5;
        this.consumerGroupId = str6;
        this.clientId = str7;
        this.uid = str8;
        this.connectionCount = num;
        this.fieldNames = strArr;
        this.deserializationSchema = iotDeserializer;
    }

    public TypeInformation<RowData> getProducedType() {
        return TypeInformation.of(RowData.class);
    }

    public void run(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
        if (Objects.isNull(this.consumerGroupId)) {
            DefaultAcsClient link = IotLinkPlatform.link(this.regionId, this.accessId, this.accessKey);
            CommonRequest request = IotLinkPlatform.getRequest("Iot", "iot", "iot-vpc." + this.regionId + ".aliyuncs.com", "2021-04-06");
            String consumerGroupId = (this.tableName.contains("product") || this.tableName.contains(Fields.EVENT)) ? IotLinkPlatform.getConsumerGroupId(link, request, this.iotInstanceId, "${" + this.tableName + "}", null) : IotLinkPlatform.getConsumerGroupId(link, request, this.iotInstanceId, this.tableName, null);
            LOG.info("consumerGroupId ---- " + consumerGroupId);
            if (Objects.isNull(consumerGroupId)) {
                LOG.error("Not open stream job. Please check again.");
                this.isRunning = false;
            }
            this.consumerGroupId = consumerGroupId;
        }
        while (this.isRunning) {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < this.connectionCount.intValue(); i++) {
                long currentTimeMillis = System.currentTimeMillis();
                String str = this.clientId + "-" + i + "|authMode=aksign,signMethod=hmacsha1,timestamp=" + currentTimeMillis + ",authId=" + this.accessId + ",iotInstanceId=" + this.iotInstanceId + ",consumerGroupId=" + this.consumerGroupId + AmqpSupport.SUB_NAME_DELIMITER;
                String doSign = AmqpClient.doSign("authId=" + this.accessId + "&timestamp=" + currentTimeMillis, this.accessKey, "hmacsha1");
                String str2 = "failover:(amqps://" + (this.iotInstanceId + "-vpc.amqp.iothub.aliyuncs.com") + ":5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30";
                Hashtable hashtable = new Hashtable();
                hashtable.put("connectionfactory.SBCF", str2);
                hashtable.put("queue.QUEUE", "default");
                hashtable.put("java.naming.factory.initial", "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
                InitialContext initialContext = new InitialContext(hashtable);
                ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("SBCF");
                Destination destination = (Destination) initialContext.lookup("QUEUE");
                Connection createConnection = connectionFactory.createConnection(str, doSign);
                arrayList.add(createConnection);
                ((JmsConnection) createConnection).addConnectionListener(AmqpClient.myJmsConnectionListener);
                Session createSession = createConnection.createSession(false, 1);
                createConnection.start();
                createSession.createConsumer(destination).setMessageListener(message -> {
                    try {
                        executorService.submit(() -> {
                            for (Map<String, Object> map : getResult(AmqpClient.processMessage(message).getMessage(), this.fieldNames)) {
                                LOG.info("message --- " + map);
                                LOG.info("data size ---- " + count);
                                count.getAndIncrement();
                                sourceContext.collect(this.deserializationSchema.deserializeIot(map, this.fieldNames));
                            }
                        });
                    } catch (Exception e) {
                        LOG.error("submit task occurs exception ", (Throwable) e);
                    }
                });
            }
            Thread.sleep(60000L);
            arrayList.forEach(connection -> {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOG.error("failed to close connection", (Throwable) e);
                }
            });
        }
    }

    public void cancel() {
        this.isRunning = false;
    }

    private List<Map<String, Object>> getResult(String str, String[] strArr) {
        ArrayList arrayList = new ArrayList();
        for (String str2 : JSONArray.parseArray(str, String.class)) {
            HashMap hashMap = new HashMap();
            JSONObject parseObject = JSONObject.parseObject(str2);
            try {
                for (String str3 : strArr) {
                    hashMap.put(str3, parseObject.get(str3));
                }
                if (hashMap.containsKey("body")) {
                    hashMap.put("body", new String(Base64.getDecoder().decode(hashMap.get("body").toString())));
                }
            } catch (Exception e) {
                LOG.error("No such field name.");
                this.isRunning = false;
            }
            arrayList.add(hashMap);
        }
        return arrayList;
    }
}
