package org.apache.iot.flink.amqp;

import java.net.URI;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.iot.flink.domain.AmqpDTO;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iot/flink/amqp/AmqpClient.class */
public class AmqpClient {
    private static final Logger LOG = LoggerFactory.getLogger("test");
    private static final ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 6000, TimeUnit.SECONDS, new LinkedBlockingQueue(50000));
    public static final MessageListener messageListener = message -> {
        try {
            executorService.submit(() -> {
                processMessage(message);
            });
        } catch (Exception e) {
            LOG.error("submit task occurs exception ", (Throwable) e);
        }
    };
    public static final JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() { // from class: org.apache.iot.flink.amqp.AmqpClient.1
        @Override // org.apache.qpid.jms.JmsConnectionListener
        public void onConnectionEstablished(URI uri) {
        }

        @Override // org.apache.qpid.jms.JmsConnectionListener
        public void onConnectionFailure(Throwable th) {
        }

        @Override // org.apache.qpid.jms.JmsConnectionListener
        public void onConnectionInterrupted(URI uri) {
        }

        @Override // org.apache.qpid.jms.JmsConnectionListener
        public void onConnectionRestored(URI uri) {
        }

        @Override // org.apache.qpid.jms.JmsConnectionListener
        public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
        }

        @Override // org.apache.qpid.jms.JmsConnectionListener
        public void onSessionClosed(Session session, Throwable th) {
        }

        @Override // org.apache.qpid.jms.JmsConnectionListener
        public void onConsumerClosed(MessageConsumer messageConsumer, Throwable th) {
        }

        @Override // org.apache.qpid.jms.JmsConnectionListener
        public void onProducerClosed(MessageProducer messageProducer, Throwable th) {
        }
    };

    public static void amqpConnect(String str, String str2, String str3, String str4, String str5, String str6, Integer num) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < num.intValue(); i++) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                String str7 = str5 + "-" + i + "|authMode=aksign,signMethod=hmacsha1,timestamp=" + currentTimeMillis + ",authId=" + str + ",iotInstanceId=" + str4 + ",consumerGroupId=" + str3 + AmqpSupport.SUB_NAME_DELIMITER;
                String doSign = doSign("authId=" + str + "&timestamp=" + currentTimeMillis, str2, "hmacsha1");
                Hashtable hashtable = new Hashtable();
                hashtable.put("connectionfactory.SBCF", "failover:(amqps://" + str6 + ":5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30");
                hashtable.put("queue.QUEUE", "default");
                hashtable.put("java.naming.factory.initial", "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
                InitialContext initialContext = new InitialContext(hashtable);
                ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("SBCF");
                Destination destination = (Destination) initialContext.lookup("QUEUE");
                Connection createConnection = connectionFactory.createConnection(str7, doSign);
                arrayList.add(createConnection);
                ((JmsConnection) createConnection).addConnectionListener(myJmsConnectionListener);
                Session createSession = createConnection.createSession(false, 1);
                createConnection.start();
                createSession.createConsumer(destination).setMessageListener(messageListener);
                LOG.info("amqp is start successfully!");
            } catch (Exception e) {
                LOG.info(e.getMessage());
                return;
            }
        }
        Thread.sleep(60000L);
    }

    public static AmqpDTO processMessage(Message message) {
        try {
            AmqpDTO amqpDTO = new AmqpDTO();
            String str = new String((byte[]) message.getBody(byte[].class));
            String stringProperty = message.getStringProperty("topic");
            amqpDTO.setMessage(str);
            amqpDTO.setTopic(stringProperty);
            return amqpDTO;
        } catch (Exception e) {
            LOG.error("processMessage occurs error ", (Throwable) e);
            return null;
        }
    }

    public static String doSign(String str, String str2, String str3) throws Exception {
        SecretKeySpec secretKeySpec = new SecretKeySpec(str2.getBytes(), str3);
        Mac mac = Mac.getInstance(str3);
        mac.init(secretKeySpec);
        return Base64.encodeBase64String(mac.doFinal(str.getBytes()));
    }
}
