/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.cli.app;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import java.util.ArrayList;
import java.util.Objects;
import java.util.function.BiConsumer;
import javax.annotation.PostConstruct;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.cli.app.AbstractApplicationClient;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.util.MessageHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Component
@Profile(value={"receiver"})
public class Receiver
extends AbstractApplicationClient {
    private static final String TYPE_TELEMETRY = "telemetry";
    private static final String TYPE_EVENT = "event";
    private static final String TYPE_ALL = "all";
    @Value(value="${message.type}")
    protected String messageType;
    private BiConsumer<String, Message> messageHandler = (endpoint, msg) -> this.handleMessage(endpoint, msg);

    void setMessageHandler(BiConsumer<String, Message> messageHandler) {
        this.messageHandler = Objects.requireNonNull(messageHandler);
    }

    @PostConstruct
    Future<CompositeFuture> start() {
        return this.clientFactory.connect().compose(con -> {
            this.clientFactory.addReconnectListener(arg_0 -> this.createConsumer(arg_0));
            return this.createConsumer(con);
        }).onComplete(arg_0 -> this.handleCreateConsumerStatus(arg_0));
    }

    private CompositeFuture createConsumer(HonoConnection connection) {
        Handler closeHandler = closeHook -> {
            this.log.info("close handler of consumer is called");
            this.vertx.setTimer((long)this.connectionRetryInterval, reconnect -> {
                this.log.info("attempting to re-open the consumer link ...");
                this.createConsumer(connection);
            });
        };
        ArrayList<Future> consumerFutures = new ArrayList<Future>();
        if (this.messageType.equals(TYPE_EVENT) || this.messageType.equals(TYPE_ALL)) {
            consumerFutures.add(this.clientFactory.createEventConsumer(this.tenantId, msg -> this.messageHandler.accept(TYPE_EVENT, msg), closeHandler));
        }
        if (this.messageType.equals(TYPE_TELEMETRY) || this.messageType.equals(TYPE_ALL)) {
            consumerFutures.add(this.clientFactory.createTelemetryConsumer(this.tenantId, msg -> this.messageHandler.accept(TYPE_TELEMETRY, msg), closeHandler));
        }
        if (consumerFutures.isEmpty()) {
            consumerFutures.add(Future.failedFuture((String)String.format("Invalid message type [\"%s\"]. Valid types are \"telemetry\", \"event\" or \"all\"", this.messageType)));
        }
        return CompositeFuture.all(consumerFutures);
    }

    private void handleMessage(String endpoint, Message msg) {
        String deviceId = MessageHelper.getDeviceId((Message)msg);
        Buffer payload = MessageHelper.getPayload((Message)msg);
        this.log.info("received {} message [device: {}, content-type: {}]: {}", new Object[]{endpoint, deviceId, msg.getContentType(), payload});
        if (msg.getApplicationProperties() != null) {
            this.log.info("... with application properties: {}", (Object)msg.getApplicationProperties().getValue());
        }
    }

    private void handleCreateConsumerStatus(AsyncResult<CompositeFuture> startup) {
        if (startup.succeeded()) {
            this.log.info("Receiver [tenant: {}, mode: {}] created successfully, hit ctrl-c to exit", (Object)this.tenantId, (Object)this.messageType);
        } else {
            this.log.error("Error occurred during initialization of receiver: {}", (Object)startup.cause().getMessage());
            this.vertx.close();
        }
    }
}

