Showing posts with label Tech. Show all posts

Spark Graphx Example with RDF data - Modelling IMDB data to a graph

Had some time to deep dive into GraphX and as it is only supported in Scala, was a good experience. Coming from experimenting on graph databases, graphx seemed an easy platform and makes one not worry about sharding/partitioning your data. The graphx paper and bob ducharme's blog seemed good starters to getting started with playing on graphx.
So I managed to get all Indian movie data for 10 different languages(Hindi, Marathi, Punjabi, Telugu, Tamil, Bhojpuri, Kannada, Gujarati, Bengali, Malayalam) with the actors and directors associated with the films.
Now came the part of building the relationships between the above entities. GraphX uses VertexId associated with each vertex in the graph which is Long(64 bit). So I went about creating the same for all entities and replacing them in the movie file for easier parsing later when creating edges(relationships). The essential part which makes GraphX super easy and super fun is that the partitioning takes place on vertices,i.e., the vertices go along with the edge when edges are on a different node but the VertexID associated with that vertex will still be the same as any other node. The essential syntax to consider is -

For Vertex -> RDD[(VertexId, (VertexObject)) 
In my case, the vertex RDD is -> RDD[(VertexId, (String, String))

For Edge -> Edge(srcVertexId, dstVertexId, "<edgeValueString>")
You can also have a URI or other data types as your edgeValue(can be a weight to associate to the edge when you happen to be choosing the destination to choose based on edge value).

These two when combined together in this line create the graph object for us -
val graph = Graph(entities,relationships)

                                                          Graph depiction of data

So, I ended up connecting a movie to all its actors and back, its director and back and the language the movie was in and back. This led to some half a million edges for my graph(in essence half of them are edges but I made two edges between all sets of entities and movie). Also, I added literal properties to the movie(which are not nodes) such as imdbRating and potentialAction(webUrl of Imdb for that movie). Literal properties can also be thought of adding to the vertexObject directly if the Object is a class with the properties as different variables. The code to construct and query the graph is below. The main program can be replaced by a scalatra webserver and be used for the same but I was more focused on the graph functionality.

New Document
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.io.Source
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.collection.mutable.ListBuffer

object MoviePropertyGraph {
  val baseURI = "http://kb.rootchutney.com/propertygraph#"

  class InitGraph(val entities:RDD[(VertexId, (String, String))], val relationships:RDD[Edge[String]],
                  val literalAttributes:RDD[(Long,(String,Any))]){
    val graph = Graph(entities,relationships)

    def getPropertyTriangles(): Unit ={
          // Output object property triples
       graph.triplets.foreach( t => println(
         s"<$baseURI${t.srcAttr._1}> <$baseURI${t.attr}> <$baseURI${t.dstAttr._1}> ."
       ))
    }

    def getLiteralPropertyTriples(): Unit = {
      entities.foreach(t => println(
        s"""<$baseURI${t._2._1}> <${baseURI}role> \"${t._2._2}\" ."""
      ))
    }

    def getConnectedComponents(): Unit = {
      val connectedComponents = graph.connectedComponents()
      graph.vertices.leftJoin(connectedComponents.vertices) {
        case(id, entity, component) => s"${entity._1} is in component ${component.get}"
      }.foreach{ case(id, str) => println(str) }
    }

    def getMoviesOfActor(): Unit = {
      println("Enter name of actor:")
      val actor = Console.readLine()
      graph.triplets.filter(triplet => (triplet.srcAttr._1.equals(actor)) && triplet.attr.equals("inFilm"))
        .foreach(t => println(t.dstAttr._1))
    }

    def getMoviesOfDirector(): Unit = {
      println("Enter name of director:")
      val director = Console.readLine()
      graph.triplets.filter(triplet => (triplet.srcAttr._1.equals(director)) && triplet.attr.equals("movie"))
        .foreach(t => println(t.dstAttr._1))
    }

    def getMoviesInLanguage(): Unit = {
      println("Enter language:")
      val lang = Console.readLine()
      graph.triplets.filter(triplet => (triplet.dstAttr._1.equals(lang)) && triplet.attr.equals("inLanguage"))
        .foreach(t => println(t.srcAttr._1))
    }

    def getTopMoviesBeginningWith(): Unit = {
      println("Enter prefix/starting tokens:")
      val prefix = Console.readLine()
      graph.vertices.filter(vertex => (vertex._2._1.startsWith(prefix)) && vertex._2._2.equals
        ("movie"))
        .distinct()
        .leftOuterJoin(literalAttributes.filter(literalTriple => (literalTriple._2._1.equals("imdbRating"))))
        .sortBy(element => element._2._2.get._2.asInstanceOf[Double], false)
        .foreach(t => println(t._2._1 +" " + t._2._2.get._2.asInstanceOf[Double]))
    }

    def getActorsInMovie(): Unit = {
      println("Enter movie:")
      val movie = Console.readLine()
      graph.triplets.filter(triplet => (triplet.srcAttr._1.equals(movie)) && triplet.attr.equals("actor"))
        .foreach(t => println(t.dstAttr._1))
    }
  }

  def main(args: Array[String]) {
    val sc = new SparkContext("local", "MoviePropertyGraph", "127.0.0.1")
    val movieFile = ""
    val actorFile = ""
    val directorFile = ""
    val languageFile = ""

    val movies = readJsonsFile(movieFile, "movie", sc)
    val actors = readCsvFile(actorFile, "actor", sc)
    val directors = readCsvFile(directorFile, "director", sc)
    val languages = readCsvFile(languageFile, "language", sc)

    val entities = movies.union(actors).union(directors).union(languages)
    val relationships: RDD[Edge[String]] = readJsonsFileForEdges(movieFile, sc)
    val literalAttributes = readJsonsFileForLiteralProperties(movieFile, sc)
    val graph = new InitGraph(entities, relationships, literalAttributes)
// Some supported functionality in graphx
//    graph.getPropertyTriangles()
//    graph.getLiteralPropertyTriples()
//    graph.getConnectedComponents()

    var input = 0
    do {
      println("1. Movies of actor \n2. Movies of director \n3. Movies in language \n4. Top Movies beginning with " +
        "\n5. Actors in movie \n6. Exit\n")
      input = Console.readInt()
      input match {
        case 1 => graph.getMoviesOfActor()
        case 2 => graph.getMoviesOfDirector()
        case 3 => graph.getMoviesInLanguage()
        case 4 => graph.getTopMoviesBeginningWith()
        case 5 => graph.getActorsInMovie()
        case 6 => "bye !!!"
        case something => println("Unexpected case: " + something.toString)
      }
    } while (input != 6);
    sc.stop
  }

  def readJsonsFile(filename: String, entityType: String, sparkContext:SparkContext):  RDD[(VertexId, (String, String))] = {
    try {
     val entityBuffer = new ListBuffer[(Long,(String,String))]
      for (line <- case="" catch="" classof="" entitybuffer.append="" entitybuffer="" entitytype="" ex:="" exception="" filename="" getlines="" head.tolong="" head="" jsondocument="" movieid="" name="" return="" source.fromfile="" sparkcontext.parallelize="" tring="" val=""> println("Exception in reading file:" + filename + ex.getLocalizedMessage)
        return sparkContext.emptyRDD
    }
  }

  def readCsvFile(filename: String, entityType: String, sparkContext:SparkContext): RDD[(VertexId, (String, String))] = {
    try {
      val entityBuffer = new ListBuffer[(Long,(String,String))]
      for (line <- -="" .map="" .split="" 1="" array="" case="" entitybuffer.appendall="" filename="" getlines="" k="" line.length="" line.substring="" source.fromfile="" split="" v=""> (k.substring(2, k.length-1), v.substring(1, v.length).toLong)}
          .map { case(k,v) => (v,(k,entityType))})
      }
      return sparkContext.parallelize(entityBuffer)
    } catch {
      case ex: Exception => println("Exception in reading file:" + filename + ex.getLocalizedMessage)
        return sparkContext.emptyRDD
    }
  }

  def readJsonsFileForEdges(filename: String, sparkContext:SparkContext): RDD[Edge[String]] = {
    try {
      val relationshipBuffer = new ListBuffer[Edge[String]]
      for (line <- actor.tolong="" actor="" actors="" case="" catch="" classof="" dge="" director="" ex:="" exception="" filename="" for="" getlines="" head.tolong="" infilm="" inlanguage="" jsondocument="" movie="" movieid="" nt="" println="" relationshipbuffer.append="" relationshiprdd.count="" relationshiprdd="" return="" source.fromfile="" tring="" val=""> println("Exception in reading file:" + filename + ex.getLocalizedMessage)
        return sparkContext.emptyRDD
    }
  }

  def readJsonsFileForLiteralProperties(filename: String, sparkContext:SparkContext): RDD[(Long,(String,Any))] = {
    try {
      val literalsBuffer = new ListBuffer[(Long,(String,Any))]
      for (line <- case="" catch="" classof="" ex:="" exception="" filename="" getlines="" head.tolong="" head="" imdbrating="" jsondocument="parse(line)" literalsbuffer.append="" literalsbuffer="" movieid="" ouble="" potentialaction="" return="" source.fromfile="" sparkcontext.parallelize="" tring="" val=""> println("Exception in reading file:" + filename + ex.getLocalizedMessage)
        return sparkContext.emptyRDD
    }
  }
}


The function "getTopMoviesBeginningWith" is the coolest of all functions as it sorts the entities based on literal properties(imdb) joined into the same RDD and provides a lame autosuggest for movies starting with a prefix.
My actors, director and languages files were in the form of csv containing name and vertexID values that I had pre-generated. For example,
'shah rukh khan': 75571848106
'subhash ghai': 69540999294
'hindi': 10036785003, 'telugu': 61331709614
My movie file was in the form of a json containing all this connected data. For example -
{"inLanguage": 10036785003,
  "director": 52308324411,
  "movieId": "0112870",
  "potentialAction": "http://www.imdb.com/title/tt0112870/",
  "name": "dilwale dulhania le jayenge",
  "imdbRating": 8.3,
 "actors": [75571848106, 19194912859, 79050811553, 37409465222, 44878963029, 10439885041, 85959578654, 31779460606, 39613049947, 29336046116, 64948799477, 71910110134, 1911950262, 19092081589, 77693233734]
}

If you happen to be needing these files, do let me know and I can email them. Last but not the least, I found building my project using SBT really easier than non SBT method as it gave a gradle like method to build projects.
Here is my sample sdt file that I used to build the project -

name := "PropertyGraph"

version := "1.0"

scalaVersion := "2.10.4"

connectInput in run := true

// additional libraries
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.5.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.5.0",
  "org.apache.spark" %% "spark-hive" % "1.5.0",
  "org.apache.spark" %% "spark-streaming" % "1.5.0",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.5.0",
  "org.apache.spark" %% "spark-streaming-flume" % "1.5.0",
  "org.apache.spark" %% "spark-mllib" % "1.5.0",
  "org.apache.spark" %% "spark-graphx" % "1.5.0",
  "org.apache.commons" %% "commons-lang3" % "3.0",
  "org.eclipse.jetty"  %% "jetty-client" % "8.1.14.v20131031",
  "org.json4s" %% "json4s-native" % "{latestVersion}",
  "org.json4s" %% "json4s-jackson" % "{latestVersion}"
)

The experimentation did not give me a real time computation(like Neo4j or Virtuoso graph databases) of the graph as spark processing takes time on a single spark node and I safely assume increasing number of nodes will increase the time due to parallel distribution and fetching. I hope this continues to get better with future versions of spark(soon to be out 1.6 !!)

Elasticsearch Delete Issues

Problem :­
We have some types of documents which have to be deleted daily based on the date in them and if it has expired. An alert from another tool tells us that if such an old dated document appears in the search results which is filtered out by the end service.
So, for the past month, we saw that the alert was getting triggered every night between 12am and 7:15am and then it disappears (no more data from yesterday)

Remember, that these documents are deleted daily using the XDELETE query to the elasticsearch cluster.

Experiments :­
We run an hourly job to delete old data and logically, one expects the old data to disappear as soon as the first delete job is run after 12am, but we keep seeing the alert with the log showing that a result from yesterday was getting triggered.

I increased the job frequency to run every 15 minutes with a "refresh" on the index after each job and yet, the splunk alert would show up until 7:15am every day. The pattern that is seen is between 6am and 7am, the number of old documents begin to decrease and finally there are none after 7:15am. Even "optimize_expunge_deletes" does not decrease the time these documents show up in the elasticsearch results.

Possible causes :­
• The second last comment on this forum reveals that elasticsearch marks documents for deletes and by default, merges only happen on an index when there is 10% or more documents to be added or deleted. As we index daily, probably 7:15am is the time when a merge happens and the documents get deleted.

• Though, we see the docID in the logs, when searching for the documents in the elasticsearch cluster with the ID leads to no results and hence, this points to that different results shown from the head than from the client API.

Possible solution :­
Note, the index does not use TTL(time­to­live) value for the documents but as per the elasticsearch forums, people still see this issue after using TTL.
So supposedly, this is the way lucene and elasticsearch want to be efficient by limiting merges as a merge is an expensive operation from memory’s perspective.
The likely solution given by elasticsearch forum experts is to create new index and swap indices whenever possible(this means swapping indices everyday for us) as a new index has faster searches avoiding the deleted documents in the lookup (overkill !!).

Considering, most of the elasticsearch users talk about documents being present in the millions, sometimes it is not feasible to index and come up with a new index daily. So, an alternate short approach can be to update the document with a field of deleted set as true and filter out such documents.

Elasticsearch document inconsistency

Problem :­
A particular search query would show results sometimes and sometimes won't.


Root cause analysis :­
A pattern observed with the hits and misses was of 011100 occuring in a repeated way.
Elasticsearch by default delegates the queries amongst the nodes till it finds the results and hence, if you want to direct your query to a particular node, a "preference" parameter can be set to "nodeID"

The way to get a nodeID (the one that I found) is ­
curl ­XGET 'localhost:9200/_nodes'

wherein every node has a ID associated in the json header.
The nodeID can be used as ­
searchRequestBuilder.setPreference("_only_node:xyz");

Using this preference, one can narrow down the shard that was showing this inconsistent behaviour for the given document. The shard stats showed that it had the same number of "numDocs" and hence it seemed for some reason, the document was not searchable on that shard.

Found a similar issue in elasticsearch groups ­

Resolution :­
The elasticsearch logs did not show any issue on the nodes and that "refresh" and "flush" on the index did not produce any exception/error in the logs and did not resolve the issue.
As per the issue thread, people had recreated their indexes or stuck to using the primary shard(which wasnt an option as recreating the index would take a lot of time and hitting the primary shard only always gave us no hits)
The no­-downtime way was to bring the replica shards down to 0 and create them again. (which luckily fixed the issue for us)

curl ­XPUT 'localhost:9200/my_index/_settings' ­d '{
  "index" : {
    "number_of_replicas" : 0
  }
}'

Run an indexing job to fill back in the documents lost in the process.

Possible Causes :­
We had some downtimes in the cluster due to high CPU usage and maybe such outages would have caused an inconsistency in the shards. Though, we still kept getting consistent results for most of the queries we had.
So the guess is that any such issue can cause elasticsearch shards to become inconsistent and the resolution is variable/dependent on the number of documents in the index and the possible downtime that can be taken.

Camera Roll Face Filter - A no-trained model for filtering out photos containing the same faces

Imagine digging through a lot of photos everytime you decide to send a photo from your smart phone to a whatsapp contact or to your facebook/twitter account.

There has to be an easier way to filter out photos containing the required content(somebody's face) in the camera roll.
This blog post is about a naive approach to solving this issue of search for your photos.








Algorithm -

1. Take any photo (one input and one to compare with the input)




2. Find the face in the image such that the face has specific features used(no hair but eyes, nose and mouth present) and resize the face to a 100 x 100 image



Note that this step uses LBP cascade classifier to detect faces in the image. Find related information here -
OpenCV Cascade Classifier tutorial

static Mat cropFaceFromImg(InputArray _frame)
{
    Mat testFrame = _frame.getMat();
    std::vector<Rect> faces;
    Mat frame_gray;
    std::vector<Mat> croppedFaces;
    Mat resizedFace;//dst image
    
    cvtColor( testFrame, frame_gray, CV_BGR2GRAY );
    //equalizeHist( frame_gray, frame_gray );
    
    //-- Detect faces
    face_cascade.detectMultiScale( frame_gray, faces, 1.1, 2, 0|CV_HAAR_SCALE_IMAGE, Size(1, 1) );
    
    for( size_t i = 0; i < faces.size(); i++ )
    {
        Mat faceImg;
        Rect rect = Rect( faces[i].x + (faces[i].width/6), faces[i].y , faces[i].width*2/3, faces[i].height ); // ROI rect in srcImg
        frame_gray(rect).copyTo(faceImg);
        Size size(100,100);//the dst image size,e.g.100x100
        
        resize(faceImg,resizedFace,size);//resize image
        croppedFaces.push_back(resizedFace);
        
    }
    //imshow("ResizedFace", resizedFace);
    //waitKey(0);
    return resizedFace;

}

3. Calculate histogram for the image.


The essence of this step is calculating a pixel with its neighboring 8 pixels and assigning a 8 bit binary value for that pixel H(I) based on whether the neighboring pixel is darker or lighter than the pixel at the center.
Related code to calculate histogram bins [From OpenCV src code]

//------------------------------------------------------------------------------
// cv::elbp
//------------------------------------------------------------------------------
template <typename _Tp> static
inline void elbp_(InputArray _src, OutputArray _dst, int radius, int neighbors) {
    //get matrices
    Mat src = _src.getMat();
    // allocate memory for result
    _dst.create(src.rows-2*radius, src.cols-2*radius, CV_32SC1);
    Mat dst = _dst.getMat();
    // zero
    dst.setTo(0);
    for(int n=0; n<neighbors; n++) {
        // sample points
        float x = static_cast<float>(radius * cos(2.0*CV_PI*n/static_cast<float>(neighbors)));
        float y = static_cast<float>(-radius * sin(2.0*CV_PI*n/static_cast<float>(neighbors)));
        // relative indices
        int fx = static_cast<int>(floor(x));
        int fy = static_cast<int>(floor(y));
        int cx = static_cast<int>(ceil(x));
        int cy = static_cast<int>(ceil(y));
        // fractional part
        float ty = y - fy;
        float tx = x - fx;
        // set interpolation weights
        float w1 = (1 - tx) * (1 - ty);
        float w2 =      tx  * (1 - ty);
        float w3 = (1 - tx) *      ty;
        float w4 =      tx  *      ty;
        // iterate through your data
        for(int i=radius; i < src.rows-radius;i++) {
            for(int j=radius;j < src.cols-radius;j++) {
                // calculate interpolated value
                float t = static_cast<float>(w1*src.at<_Tp>(i+fy,j+fx) + w2*src.at<_Tp>(i+fy,j+cx) + w3*src.at<_Tp>(i+cy,j+fx) + w4*src.at<_Tp>(i+cy,j+cx));
                // floating point precision, so check some machine-dependent epsilon
                dst.at<int>(i-radius,j-radius) += ((t > src.at<_Tp>(i,j)) || (std::abs(t-src.at<_Tp>(i,j)) < std::numeric_limits<float>::epsilon())) << n;
            }
        }
    }

}

4. Compare the histogram with the histogram of the input face image.(this is done similar to a type of Face Recognizer in OpenCV called Linear Binary Patterns Histogram)


For this step, OpenCV has four different types of Histogram compare methods.
I use the CV_COMP_CORREL method as it allows for a basic correlation done and its value does not change even if we reverse the compared photo with the input. The formula for the correlation of histogram bins is -

d(H_1,H_2) =  \frac{\sum_I (H_1(I) - \bar{H_1}) (H_2(I) - \bar{H_2})}{\sqrt{\sum_I(H_1(I) - \bar{H_1})^2 \sum_I(H_2(I) - \bar{H_2})^2}}
where
  \bar{H_k} =  \frac{1}{N} \sum _J H_k(J)

and N is the total number of historgram bins.
[From Histogram Comparison OpenCV tutorials]
The above formula gives a value of 1 if the images are same and its value is proportional to the similarity of the images.

5. If histogram correlation shows more than 0.6, the image can be considered belonging to the same person.


Pros :-
1. A single step classifier which can be made better as the search queries increase by combining data sets of similar face searches.
2. No face modeling done and hence relies on the simplicity of histogram creation and comparison.
3. Good for mobile platform analysis

Cons:-
1. A naive classifier of 0.6 threshold so chances to get false positives and false negatives are high.
2. No face modeling done so critical features of the face may be neglected due to the basic histogram approach.

Note:- This is still a naive way to filter faces but the algorithm can be made adaptive as with each iteration of searches, we can keep improve the accuracy by combining search results for the same person and filtering out the faces which do not fall into the intersection set.

The github repo(source code) for the above can be found here