Part 1. Twitter graph with HBase, Tinkerpop and twitter4j

Mathieu Guglielmino
6 min readApr 29, 2018

Today we’ll look into the creation of a Twitter graph, using the capabilities of the distributed storage system of Hadoop, HBase, and the java implementation of the Twitter API : twitter4j.

Prerequisites : Java and an IDE (preferably IntelliJ), HBase on a cluster or on local mode.

Part 1 : The graph schema

First thing first, let’s dive into the schema of the graph. The interested reader can look into the details of the Twitter API to better understand the User, Tweet and Entities object, which are implemented in Twitter4j.

Graphs databases are of great importance today and we can mention among the most popular the neo4j, Janusgraph (built on top of HBase), … I won’t dive into details which are beyond the scope of this article, but the main implementations provide ways to define vertices, edges and properties. In the following, I will use HGraphDB which is “HBase as a TinkerPop Graph Database”.

We will create the database in two steps : first the nodes, in a CreateGraphNodes class, and the edges in a CreateGraphEdges class. The schema is given by the figure above, on which we pretty much spot 3 categories of data : 1/ Data enrichment using NLP tools (entities recognition, topic detection, …) 2/ The interaction with other tweets 3/ The entities in the text of the tweet. Since much of the properties of self-explanatory, I just precise that “followers_at_time” and such keep in memory a few useful statistics about the user at the time of the post.

Configuration cfg = new HBaseGraphConfiguration()
.setInstanceType(InstanceType.DISTRIBUTED)
.setGraphNamespace("tweets_graph")
.setCreateTables(true)
.set("hbase.zookeeper.quorum", "127.0.0.1")
.set("zookeeper.znode.parent", "/hbase")
.setUseSchema(true);
HBaseGraph graph = HBaseGraph.open(cfg);

This code will be used to connect to the “tweets_graph” database. To specify the creation of a vertex, we will use the the createLabel function of the HBaseGraph class. We precise the type ( ElementType.VERTEX or ElementType.EDGE) and the pairs of key/value.

graph.createLabel(ElementType.VERTEX, "user", ValueType.LONG,
"id", ValueType.LONG,
"user_key", ValueType.STRING,
"screen_name", ValueType.STRING,
"created_at", ValueType.LONG,
"created_str", ValueType.STRING,
"favourites_count", ValueType.INT,
"followers_count", ValueType.INT,
"listed_count", ValueType.INT,
"friends_count", ValueType.INT,
"statuses_count", ValueType.INT,
"lang", ValueType.STRING,
"time_zone", ValueType.STRING,
"verified", ValueType.BOOLEAN,
"description", ValueType.STRING,
"location", ValueType.STRING);

We can do the same for the tweet, url, hashtag, source and person, location, organization vertices. Concerning the edges, we create the labels and then connect them to their corresponding vertices :

graph.createLabel(ElementType.EDGE, "RETWEETED_STATUS", ValueType.STRING);
graph.createLabel(ElementType.EDGE, "QUOTED_STATUS", ValueType.STRING);
graph.createLabel(ElementType.EDGE, "IN_REPLY_TO", ValueType.STRING);
graph.createLabel(ElementType.EDGE, "HAS_LINK", ValueType.STRING);
graph.createLabel(ElementType.EDGE, "MENTIONS", ValueType.STRING);
graph.createLabel(ElementType.EDGE, "HAS_TAG", ValueType.STRING);
graph.createLabel(ElementType.EDGE, "POSTED", ValueType.STRING);
graph.createLabel(ElementType.EDGE, "POSTED_VIA", ValueType.STRING);
graph.connectLabels("tweet", "RETWEETED_STATUS", "tweet");
graph.connectLabels("tweet", "QUOTED_STATUS", "tweet");
graph.connectLabels("tweet", "IN_REPLY_TO", "tweet");
graph.connectLabels("tweet", "HAS_LINK", "url");
graph.connectLabels("tweet", "MENTIONS", "user");
graph.connectLabels("tweet", "IN_REPLY_TO", "user");
graph.connectLabels("tweet", "HAS_TAG", "hashtag");
graph.connectLabels("user", "POSTED", "tweet");
graph.connectLabels("tweet", "POSTED_VIA", "source");

graph.connectLabels("tweet", "MENTIONS", "person");
graph.connectLabels("tweet", "MENTIONS", "organization");
graph.connectLabels("tweet", "MENTIONS", "location");

Don’t forget to close the graph with graph.close().

And ? And that’s it ! These few lines define the schema of the database, ready to welcome the tweets to come ;)

Part 2 : Streaming the tweets

The twitter4j package provides a lot of useful functions to deal with tweets. First, we instantiate the connection, using the useful ParameterTool of flink :

final ParameterTool params = ParameterTool.fromArgs(args);
//System.out.println("Usage: TwitterExample [--output <path>] " +
// "[--twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> --twitter-source.token <token> --twitter-source.tokenSecret <tokenSecret>]");

String CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET;
CONSUMER_KEY = params.get("twitter-source.consumerKey");
CONSUMER_SECRET = params.get("twitter-source.consumerSecret");
ACCESS_TOKEN = params.get("twitter-source.token");
ACCESS_TOKEN_SECRET = params.get("twitter-source.tokenSecret");

ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(CONSUMER_KEY)
.setOAuthConsumerSecret(CONSUMER_SECRET)
.setOAuthAccessToken(ACCESS_TOKEN)
.setOAuthAccessTokenSecret(ACCESS_TOKEN_SECRET);
twitter4j.conf.Configuration twitter_cfg = cb.build();

Twitter twitter = new TwitterFactory(twitter_cfg).getInstance();

The consumer key, secret and tokens can be found on apps.twitter.com (create one if it does not already exist).

Once the Twitter twitter exists, we can start the streaming process by rewriting the StatusListener class :

StatusListener listener = new StatusListener(){
@Override
public void onStatus(Status status) {
System.out.println("@" + status.getUser() + ":" + status.getText());
}
};

TwitterStream twitterStream = new TwitterStreamFactory(cb).getInstance();
FilterQuery tweetFilterQuery = new FilterQuery();
tweetFilterQuery.track(new String[]{"bieber"});

twitterStream.addListener(listener);
twitterStream.filter(tweetFilterQuery);

I skipped them for clarity but you can rewrite the onException, onDeletionNotice, onScrubGeo, onStallWarning and onTrackLimitationNotice. The main function that will interest us here is onStatus, which receives an element of the Status class. The class implements all the function you could expect.

The lines after the StatusListener define the twitterStream and add them a query to filter the tweets.

As simple as that! As you could expect, we will implement in the onStatus function the way we write the data into our graph database.

Part 3 : Saving the tweets into the database

To do that, we will write a custom TwitterToGraph that will be constructed with a Twitter and HBaseGraph objects. To save all the useful information, we’ll need the following functions : getOrCreateTweet, getOrCreateUser, getOrCreateMention, getOrCreateHashtag, getOrCreateUrl, getOrCreateSource, and a createEdgesFromTweet which will link the vertices between them.

public Vertex getOrCreateTweet(Status status) {
System.out.printf("%s - %s : %s\n", status.getId(), status.getUser().getScreenName(), status.getText());

GraphTraversal<Vertex, Vertex> vt = g.traversal().V().has("id", status.getId());
if (vt.hasNext()) {
g.traversal().V(status.getId()).property("favourites_count", status.getFavoriteCount(),
"retweets_count", status.getRetweetCount());

System.out.println("Tweet data updated.");
return vt.next();
}
else {
User user = status.getUser();

Vertex tweet = g.addVertex(T.id, status.getId(), T.label, "tweet",
"id", status.getId(),
"created_str", status.getCreatedAt().toString(),
"created_at", status.getCreatedAt().getTime()/1000,
"text", status.getText(),
"favourites_count", status.getFavoriteCount(),
"retweets_count", status.getRetweetCount(),
"retweeted", status.isRetweeted(),
"followers_at_time", status.getUser().getFollowersCount(),
"friends_at_time", user.getFriendsCount(),
"statuses_at_time", user.getStatusesCount(),
"listed_at_time", user.getListedCount());
System.out.println("Tweet created!");
return tweet;
}
}

The getOrCreateTweet is interesting on several points. First, it checks if the user already exists in the database with a traversal. If it does, we update the number of retweets/favorites. If you are not familiar with this notion nor with the TinkerPop framework, I strongly advise you to look into their getting started which covers the basics.

Just to precise a bit what the line does, the g.traversal() creates what’s called a traversal (imagine a little gremlin walking on your graph), and .V().has("id", status.getId()) check if the iterator on the Vertices that have the corresponding id has a next element ( vt.hasNext() ). If it does, then the element already exists and we update it, otherwise we will create it.

The creation of the vertex is pretty straight forward, with a simple g.addVertex. Let’s just note the “created_at” property, which is status.getCreatedAt().getTime()/1000. In fact, getTime() returns the number of milliseconds since epoch (1st of January 1970), and we are more interested in the number of seconds.

I won’t dive into the other getOrCreate functions, you can look up into my github. Though, let’s look at createEdgesFromTweets.

getOrCreateUser(tweet.getUser()).addEdge("POSTED", getOrCreateTweet(tweet));
status.addEdge("POSTED_VIA", getOrCreateSource(tweet.getSource()));

// Hashtags, mentions, url
for (HashtagEntity hashtag : tweet.getHashtagEntities()) status.addEdge("HAS_TAG",
getOrCreateHashtag(hashtag));
for (UserMentionEntity u : tweet.getUserMentionEntities()) status.addEdge("MENTIONS",
getOrCreateMention(u));
for (URLEntity url : tweet.getURLEntities()) status.addEdge("HAS_LINK",
getOrCreateUrl(url));

// Quoted, retweeted, in reply to
if (tweet.getInReplyToStatusId() != -1) {
Status in_reply_to = twitter.showStatus(tweet.getInReplyToStatusId());
status.addEdge("IN_REPLY_TO",
getOrCreateUser(in_reply_to.getUser()));
}
if (tweet.getQuotedStatusId() != -1) status.addEdge("QUOTED_STATUS",
getOrCreateTweet(tweet.getQuotedStatus()));
if (tweet.isRetweet()) status.addEdge("RETWEETED_STATUS",
getOrCreateTweet(tweet.getRetweetedStatus()));

The function does exactly what is expected of it. Especially, you can get the hashtags in the tweets with the tweet.getHashtagEntities (idem for urls, mentions, …). We also check if the status is a retweeted or a quoted status.

With these different functions, we can then modify the StatusListener accordingly. We initialize the TwitterToGraph class with TwitterToGraph tw = new TwitterToGraph(twitter, graph) where twitter is a Twitter object and graph a HBaseGraph object.

@Override
public void onStatus(Status status) {
try {
Vertex tweet;
tweet = tw.getOrCreateTweet(status);
tw.createEdgesFromTweet(tweet, status);
System.out.println("Status written in database.");
} catch (Exception e) {
e.printStackTrace();
}
}

Part 4 : What’s next

No, Nora, not nothing is next.

A few things may be to come related to this subject. In particular, I never really explained what the 3 vertices person, organization and location where, and that’s because I plan on developing some NLP (Natural Language Processing) tools to do that, especially entities recognition.

Of course, since we now have at our disposal a TinkerPop graph, it will be the perfect occasion to do some analysis on it and explore a bit what we can do on graphs.

Stay tuned!

--

--