package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.immutable.factory.ImmutableFactory;
import io.floodplain.replication.api.ReplicationMessage;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/StoreStateProcessor.class */
public class StoreStateProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private static final Logger logger = LoggerFactory.getLogger(StoreStateProcessor.class);
    private final String name;
    private final String lookupStoreName;
    private final Function<ImmutableMessage, ImmutableMessage> initial;
    private KeyValueStore<String, ImmutableMessage> lookupStore;
    private final Optional<BiFunction<ImmutableMessage, ImmutableMessage, String>> keyExtractor;
    public static final String COMMONKEY = "singlerestore";

    public StoreStateProcessor(String str, String str2, Function<ImmutableMessage, ImmutableMessage> function, Optional<BiFunction<ImmutableMessage, ImmutableMessage, String>> optional) {
        this.name = str;
        this.lookupStoreName = str2;
        this.initial = function;
        this.keyExtractor = optional;
    }

    public void init(ProcessorContext processorContext) {
        this.lookupStore = processorContext.getStateStore(this.lookupStoreName);
        super.init(processorContext);
    }

    public void process(String str, ReplicationMessage replicationMessage) {
        String apply = this.keyExtractor.orElse((immutableMessage, immutableMessage2) -> {
            return COMMONKEY;
        }).apply(replicationMessage.message(), (ImmutableMessage) replicationMessage.paramMessage().orElse(ImmutableFactory.empty()));
        this.lookupStore.put(apply, replicationMessage.message());
        super.context().forward(apply, replicationMessage.withOperation(ReplicationMessage.Operation.UPDATE));
    }
}
