当前位置:编程学习 > JAVA >>

mahout算法源码分析之Itembased Collaborative Filtering(一)PreparePreferenceMatrixJob

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit。
本篇分析RecommenderJob的源码,这个类也是继承了AbstractJob,所以也会覆写其run方法,点开这个run方法,可以看到和其他的job类都一样,刚开始都是基本参数的默认值设置和获取;然后到了第一个job,在这个job之前有一个shouldRunNextPhase()函数,点开这个函数看到下面的源码:
[java] 
protected static boolean shouldRunNextPhase(Map<String, List<String>> args, AtomicInteger currentPhase) {  
    int phase = currentPhase.getAndIncrement();  
    String startPhase = getOption(args, "--startPhase");  
    String endPhase = getOption(args, "--endPhase");  
    boolean phaseSkipped = (startPhase != null && phase < Integer.parseInt(startPhase))  
        || (endPhase != null && phase > Integer.parseInt(endPhase));  
    if (phaseSkipped) {  
      log.info("Skipping phase {}", phase);  
    }  
    return !phaseSkipped;  
  }  
其中phase是获取当前的phase值的,关于phase的相关概念可以参考:mahout中phase的含义,这里可以看到主要是根据phase和startPhase、endPhase的值做比较,然后返回true或者false,因为在实战中是按默认值的(startPhase和endPhase都没有设置),所以RecommenderJob中的这个函数都是返回true的。
看第一个job的调用:
[java]  
if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
      ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{  
              "--input", getInputPath().toString(),  
              "--output", prepPath.toString(),  
              "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),  
              "--minPrefsPerUser", String.valueOf(minPrefsPerUser),  
              "--booleanData", String.valueOf(booleanData),  
              "--tempDir", getTempPath().toString()});  
  
      numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());  
    }  
这里看到调用的job主类是PreparePreferenceMatrixJob,然后这个job的输入参数有输入、出、maxPrefsPerUser、minPrefsPerUser、booleanData、tempDir。那么就打开主类PreparePreferenceMatrixJob,来看看。这个PreparePreferenceMatrixJob同样实现了AbstractJob类,那么直接看run方法吧。在run中的参数设置里有一个ratingShift,这个在调用的时候没有使用,所以按照默认,设置为0.0。大致浏览一下发现一共有三个prepareJob,所以这个主类会产生3个job。下面来一个个来看:
(1)//convert items to an internal index
[java] 
Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class,  
            ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class,  
            VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class);  
输入格式:userid,itemid,value
先看mapper:
[java]  
protected void map(LongWritable key,  
                     Text value,  
                     Context context) throws IOException, InterruptedException {  
    String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());  
    long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);  
    int index = TasteHadoopUtils.idToIndex(itemID);  
    context.write(new VarIntWritable(index), new VarLongWritable(itemID));  
  }  
在map中,首先获得itemID,在tokens中tokens[1]即是itemID了,至于当transpose为true的时候就要选择tokens[0]作为itemID这个应该是其他的应用吧,由于在调用的时候没有设置这个参数,所以这里按照默认值为false,所以选择tokens[1]作为itemID。然后看到index和itemID的转换使用的是TasteHadoopUtils.idToIndex()函数,看到这个函数返回的是return 0x7FFFFFFF & Longs.hashCode(id);所以当这个数在int可以表示的数范围内(小于2147483647)时候就会返回这个数本身了,比如实战中的项目101,返回的index也是101。
 
再看reducer:
[java]  
protected void reduce(VarIntWritable index,  
                        Iterable<VarLongWritable> possibleItemIDs,  
                        Context context) throws IOException, InterruptedException {  
    long minimumItemID = Long.MAX_VALUE;  
    for (VarLongWritable varLongWritable : possibleItemIDs) {  
      long itemID = varLongWritable.get();  
      if (itemID < minimumItemID) {  
        minimumItemID = itemID;  
      }  
    }  
    if (minimumItemID != Long.MAX_VALUE) {  
      context.write(index, new VarLongWritable(minimumItemID));  
    }  
  }  
总感觉这里没啥必要,reducer返回的还是101-->101,或者这里应该有什么说法的?
输出文件是ITEMID_INDEX,输出格式<key,value>   :   VarintWritable-->VarLongWritable  
所以这个job就分析完了。
(2)//convert user preferences into a vector per user
[java] 
Job toUserVectors = prepareJob(getInputPath(), getOutputPath(USER_VECTORS), TextInputFormat.class,  
            ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,  
            ToUserVectorsReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class);  
输入格式:userid,itemid,value
看mapper:(ToItemPrefsMapper继承ToEntityPrefsMapper,而ToItemPrefsMapper是空的,所以看ToEntityPrefsMapper)
[java] 
public void map(LongWritable key,  
                  Text value,  
                  Context context) throws IOException, InterruptedException {  
    String[] tokens = DELIMITER.split(value.toString());  
    long userID = Long.parseLong(tokens[0]);  
    long itemID = Long.parseLong(tokens[1]);  
    if (itemKey ^ transpose) {  
      // If using items as keys, and not transposing items and users, then users are items!  
      // Or if not using items as keys (users are, as usual), but transposing items and users,  
      // then users are items! Confused?  
      long temp = userI
补充:软件开发 , Java ,
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,