package eu.dicodeproject.twitterstream.sink;

import eu.dicodeproject.analysis.hbase.TweetCols;
import eu.dicodeproject.analysis.util.Language;
import eu.dicodeproject.twitterstream.vectorize.Vectorizer;
import java.io.IOException;
import java.net.URL;
import java.util.Map;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.GeoLocation;
import twitter4j.HashtagEntity;
import twitter4j.Place;
import twitter4j.Status;
import twitter4j.URLEntity;
import twitter4j.User;
import twitter4j.UserMentionEntity;

/* loaded from: input_file:eu/dicodeproject/twitterstream/sink/HBaseTweetSink.class */
public class HBaseTweetSink implements TweetSink {
    private static final Logger log = LoggerFactory.getLogger(HBaseTweetSink.class);
    private byte[] tweetFamily;
    private byte[] vectorFamily;
    private byte[] userFamily;
    private String tweetTableName;
    private String userTableName;
    private String zookeeperQuorum = "localhost";
    private int zookeeperPort = 2181;
    private Configuration configuration;
    private HTablePool hTablePool;
    private Vectorizer vectorizer;
    private static long counter;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r3v1, types: [byte[], byte[][]] */
    /* JADX WARN: Type inference failed for: r3v3, types: [byte[], byte[][]] */
    @PostConstruct
    public void init() throws IOException {
        if (this.configuration == null) {
            this.configuration = new Configuration();
            this.configuration.clear();
            this.configuration.set("hbase.zookeeper.quorum", this.zookeeperQuorum);
            this.configuration.setInt("hbase.zookeeper.property.clientPort", this.zookeeperPort);
        }
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.configuration);
        initTable(this.tweetTableName, hBaseAdmin, new byte[]{this.tweetFamily, this.vectorFamily});
        initTable(this.userTableName, hBaseAdmin, new byte[]{this.userFamily});
        this.hTablePool = new HTablePool(this.configuration, 100);
    }

    @PreDestroy
    public void shutdown() {
        this.hTablePool.closeTablePool(this.userTableName);
        this.hTablePool.closeTablePool(this.tweetTableName);
    }

    private void initTable(String str, HBaseAdmin hBaseAdmin, byte[]... bArr) throws IOException {
        if (checkTable(str, hBaseAdmin)) {
            return;
        }
        HTableDescriptor hTableDescriptor = new HTableDescriptor(str);
        for (byte[] bArr2 : bArr) {
            hTableDescriptor.addFamily(new HColumnDescriptor(bArr2));
            if (log.isInfoEnabled()) {
                StringBuilder sb = new StringBuilder("Creating table ");
                sb.append(str);
                sb.append(" with column family " + Bytes.toString(bArr2) + " as non were found.");
                log.info(sb.toString());
            }
        }
        hBaseAdmin.createTable(hTableDescriptor);
    }

    private boolean checkTable(String str, HBaseAdmin hBaseAdmin) throws IOException {
        for (HTableDescriptor hTableDescriptor : hBaseAdmin.listTables()) {
            if (str.equals(hTableDescriptor.getNameAsString())) {
                return true;
            }
        }
        return false;
    }

    private void add(byte[] bArr, byte[] bArr2, String str, Put put) {
        if (str != null) {
            put.add(bArr, bArr2, Bytes.toBytes(str));
        }
    }

    private void add(byte[] bArr, byte[] bArr2, long j, Put put) {
        put.add(bArr, bArr2, Bytes.toBytes(j));
    }

    private void add(byte[] bArr, byte[] bArr2, int i, Put put) {
        put.add(bArr, bArr2, Bytes.toBytes(i));
    }

    private void add(byte[] bArr, byte[] bArr2, double d, Put put) {
        put.add(bArr, bArr2, Bytes.toBytes(d));
    }

    @Override // eu.dicodeproject.twitterstream.sink.TweetSink
    public void store(Status status) throws IOException {
        HTableInterface table = this.hTablePool.getTable(this.tweetTableName);
        HTableInterface table2 = this.hTablePool.getTable(this.userTableName);
        try {
            try {
                storeTweetData(status, table);
                storeUserData(status, table2);
                long j = counter + 1;
                counter = j;
                if (j % 100000 == 0) {
                    log.info(counter + " since restart / 100,000 tweets stored");
                }
            } catch (Exception e) {
                log.warn("Error processing tweet: " + e.getMessage() + " (Ignoring and continuing processing - be alarmed if that happens often for your desired definition of often.) ", e);
                log.warn("Tweet that let to the exception: " + status);
                this.hTablePool.putTable(table2);
                this.hTablePool.putTable(table);
            }
        } finally {
            this.hTablePool.putTable(table2);
            this.hTablePool.putTable(table);
        }
    }

    private void storeUserData(Status status, HTableInterface hTableInterface) throws IOException {
        Put put = null;
        if (status.getUser() != null) {
            User user = status.getUser();
            put = new Put(Bytes.toBytes(user.getId()));
            if (user.getProfileImageURL() != null) {
                add(this.userFamily, TweetCols.IMAGE_URL.bytes(), user.getProfileImageURL().toString(), put);
            }
            add(this.userFamily, TweetCols.LANG.bytes(), user.getLang(), put);
            if (user.getCreatedAt() != null) {
                add(this.userFamily, TweetCols.USER_CREATED_AT.bytes(), user.getCreatedAt().getTime(), put);
            }
            add(this.userFamily, TweetCols.USER_DESCRIPTION.bytes(), user.getDescription(), put);
            add(this.userFamily, TweetCols.USER_FAVOURITES_COUNT.bytes(), user.getFavouritesCount(), put);
            add(this.userFamily, TweetCols.USER_FOLLOWERS_COUNT.bytes(), user.getFollowersCount(), put);
            add(this.userFamily, TweetCols.USER_FRIENDS_COUNT.bytes(), user.getFriendsCount(), put);
            if (user.getListedCount() > 0) {
                add(this.userFamily, TweetCols.USER_LISTED_COUNT.bytes(), user.getListedCount(), put);
            }
            add(this.userFamily, TweetCols.USER_LOCATION.bytes(), user.getLocation(), put);
            add(this.userFamily, TweetCols.USER_NAME.bytes(), user.getName(), put);
            add(this.userFamily, TweetCols.USER_PROFILE_BACKGROUND_COLOR.bytes(), user.getProfileBackgroundColor(), put);
            add(this.userFamily, TweetCols.USER_PROFILE_BACKGROUND_IMAGE_URL.bytes(), user.getProfileBackgroundImageUrl(), put);
            if (user.getProfileImageURL() != null) {
                add(this.userFamily, TweetCols.USER_PROFILE_IMAGE_URL.bytes(), user.getProfileImageURL().toString(), put);
            }
            add(this.userFamily, TweetCols.USER_PROFILE_LINK_COLOR.bytes(), user.getProfileLinkColor(), put);
            add(this.userFamily, TweetCols.USER_PROFILE_SIDEBAR_BORDER_COLOR.bytes(), user.getProfileSidebarBorderColor(), put);
            add(this.userFamily, TweetCols.USER_PROFILE_SIDEBAR_FILL_COLOR.bytes(), user.getProfileSidebarFillColor(), put);
            add(this.userFamily, TweetCols.USER_PROFILE_TEXT_COLOR.bytes(), user.getProfileTextColor(), put);
            add(this.userFamily, TweetCols.USER_SCREEN_NAME.bytes(), user.getScreenName(), put);
            add(this.userFamily, TweetCols.USER_STATUSES_COUNT.bytes(), user.getStatusesCount(), put);
            add(this.userFamily, TweetCols.USER_TIMEZONE.bytes(), user.getTimeZone(), put);
            if (user.getURL() != null) {
                add(this.userFamily, TweetCols.USER_URL.bytes(), user.getURL().toString(), put);
            }
            add(this.userFamily, TweetCols.USER_UTC_OFFSET.bytes(), user.getUtcOffset(), put);
        }
        if (put != null) {
            hTableInterface.put(put);
        }
    }

    private void storeTweetData(Status status, HTableInterface hTableInterface) throws IOException {
        Put put = new Put(Bytes.toBytes(status.getId()));
        String str = "";
        if (status.getUser() != null) {
            User user = status.getUser();
            add(this.tweetFamily, TweetCols.FROM.bytes(), user.getName(), put);
            add(this.tweetFamily, TweetCols.FROM_ID.bytes(), user.getId(), put);
            str = user.getLang();
            add(this.tweetFamily, TweetCols.LANG.bytes(), str, put);
        }
        if (status.getCreatedAt() != null) {
            add(this.tweetFamily, TweetCols.CREATION_DATE.bytes(), status.getCreatedAt().getTime(), put);
        }
        if (status.getInReplyToStatusId() > 0) {
            add(this.tweetFamily, TweetCols.IN_REPLY_TO_STATUS_ID.bytes(), "" + status.getInReplyToStatusId(), put);
        }
        add(this.tweetFamily, TweetCols.SOURCE.bytes(), status.getSource(), put);
        String text = status.getText();
        add(this.tweetFamily, TweetCols.TEXT.bytes(), text, put);
        if (this.vectorizer != null) {
            this.vectorizer.setLanguage(Language.fromCode(str));
            for (Map.Entry<String, Integer> entry : this.vectorizer.createVector(text).entrySet()) {
                add(this.vectorFamily, Bytes.toBytes(entry.getKey()), entry.getValue().intValue(), put);
            }
        }
        if (status.getInReplyToUserId() > 0) {
            add(this.tweetFamily, TweetCols.TO.bytes(), "" + status.getInReplyToUserId(), put);
        }
        addHashtags(status, put);
        addLocationData(status, put);
        addFirstUrl(status, put);
        addUsersMentioned(status, put);
        if (status.getRetweetCount() > 0) {
            add(this.tweetFamily, TweetCols.RETWEET_COUNT.bytes(), status.getRetweetCount(), put);
        }
        if (status.getRetweetedStatus() != null) {
            store(status.getRetweetedStatus());
        }
        hTableInterface.put(put);
    }

    private void addUsersMentioned(Status status, Put put) {
        UserMentionEntity[] userMentionEntities = status.getUserMentionEntities();
        if (userMentionEntities == null || userMentionEntities.length <= 0) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        StringBuilder sb2 = new StringBuilder();
        for (UserMentionEntity userMentionEntity : userMentionEntities) {
            sb.append('#');
            sb.append(userMentionEntity.getId());
            sb2.append('#');
            sb2.append(userMentionEntity.getName());
        }
        add(this.tweetFamily, TweetCols.USER_IDS_MENTIONED.bytes(), sb.toString(), put);
        add(this.tweetFamily, TweetCols.USER_NAMES_MENTIONED.bytes(), sb2.toString(), put);
    }

    private void addHashtags(Status status, Put put) {
        HashtagEntity[] hashtagEntities = status.getHashtagEntities();
        if (hashtagEntities == null || hashtagEntities.length <= 0) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        for (HashtagEntity hashtagEntity : hashtagEntities) {
            sb.append('#');
            sb.append(hashtagEntity.getText());
        }
        add(this.tweetFamily, TweetCols.HASHTAGS.bytes(), sb.toString(), put);
    }

    private void addFirstUrl(Status status, Put put) {
        URLEntity[] uRLEntities = status.getURLEntities();
        if (uRLEntities == null || uRLEntities.length <= 0) {
            return;
        }
        URLEntity uRLEntity = uRLEntities[0];
        URL expandedURL = uRLEntity.getExpandedURL();
        if (expandedURL == null) {
            expandedURL = uRLEntity.getURL();
        }
        add(this.tweetFamily, TweetCols.URL.bytes(), expandedURL.toString(), put);
    }

    private void addLocationData(Status status, Put put) {
        boolean z = false;
        double d = 0.0d;
        double d2 = 0.0d;
        if (status.getGeoLocation() != null) {
            add(this.tweetFamily, TweetCols.GEO.bytes(), status.getGeoLocation().toString(), put);
        }
        if (status.getPlace() != null) {
            Place place = status.getPlace();
            add(this.tweetFamily, TweetCols.PLACE_COUNTRY_CODE.bytes(), place.getCountryCode(), put);
            add(this.tweetFamily, TweetCols.PLACE_FULL_NAME.bytes(), place.getFullName(), put);
            add(this.tweetFamily, TweetCols.PLACE_TYPE.bytes(), place.getPlaceType(), put);
            if (place.getGeometryType() != null && place.getGeometryType().equals("Point")) {
                GeoLocation[][] geometryCoordinates = place.getGeometryCoordinates();
                d = geometryCoordinates[0][0].getLatitude();
                d2 = geometryCoordinates[0][0].getLongitude();
                z = true;
            }
            if (!z && status.getGeoLocation() != null) {
                d = status.getGeoLocation().getLatitude();
                d2 = status.getGeoLocation().getLongitude();
                z = true;
            }
            if (!z && place.getBoundingBoxType() != null && place.getBoundingBoxType().equals("Polygon")) {
                GeoLocation[][] boundingBoxCoordinates = place.getBoundingBoxCoordinates();
                d = boundingBoxCoordinates[0][0].getLatitude();
                d2 = boundingBoxCoordinates[0][0].getLongitude();
                z = true;
            }
        }
        if (z) {
            add(this.tweetFamily, TweetCols.PLACE_LATITUDE.bytes(), d, put);
            add(this.tweetFamily, TweetCols.PLACE_LONGITUDE.bytes(), d2, put);
        }
    }

    public void setTweetFamily(String str) {
        this.tweetFamily = str.getBytes();
    }

    public void setTweetVectorsFamily(String str) {
        this.vectorFamily = str.getBytes();
    }

    public void setUserFamily(String str) {
        this.userFamily = str.getBytes();
    }

    public void setTweetTableName(String str) {
        this.tweetTableName = str.trim();
    }

    public void setUserTableName(String str) {
        this.userTableName = str.trim();
    }

    public void setZookeeperQuorum(String str) {
        this.zookeeperQuorum = str;
    }

    public void setZookeeperPort(int i) {
        this.zookeeperPort = i;
    }

    public void setVectorizer(Vectorizer vectorizer) {
        this.vectorizer = vectorizer;
    }

    void setHBaseConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }
}
