MongoDB ORM框架Morphia缓存小坑

最近在用Morphia更新MongoDB数据时发现更新完后没有拿到更新后的新数据,一开始没有在意以为MongDB复制集读写分离后拿的从服务器数据,从服务器还没有同步到更新。后来一想不太对,复制集的op同步的间隔很短,更新和查询又是在一个上下文中,而且当时读写相当空闲,虽然NOSQL都是保证最终一致性,但如果这么简单的一个操作都没办法保证原子性那也太搓了,而且用MongoDB这么多年也没发现类似的问题。

重现代码:

Test
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void test() {
Query<Blog> q = datastore.createQuery(Blog.class).field("author").equal("b");
System.out.println(q.get());
Query<Blog> ctQ = datastore.createQuery(Blog.class);
ctQ.field("author").equal("b");
UpdateOperations<Blog> u = datastore.createUpdateOperations(Blog.class);
u.set("post", "zzzz");
datastore.update(ctQ, u);
System.out.println(q.get());
}

执行一下查询,然后在更新,在查询。根据日志显示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#####第一次执行查询,post是1111111
Creating MappedClass for class mongo.service.Blog
[TRACE] 2017-11-02 09:21:00,506 [main] [org.mongodb.morphia.query.QueryImpl] - Running query(Blog) : { "author" : "b"}, fields:null,off:0,limit:1
[TRACE] 2017-11-02 09:21:00,506 [main] [org.mongodb.morphia.query.QueryImpl] - Getting cursor(Blog) for query:{ "author" : "b"}
[DEBUG] 2017-11-02 09:21:00,506 [main] [org.mongodb.driver.protocol.command] - Sending command {find : BsonString{value='Blog'}} to database PTMO_TEST on connection [connectionId{localValue:2, serverValue:7577}] to server 26.47.136.186:57004
[DEBUG] 2017-11-02 09:21:00,508 [main] [org.mongodb.driver.protocol.command] - Command execution completed
Blog(_id=59fa6b4b934f0741cd03663e, author=b, post=1111111, comments=null)
####### 执行更新,更新post为zzzz
[TRACE] 2017-11-02 09:21:00,513 [main] [org.mongodb.morphia.DatastoreImpl] - Executing update(Blog) for query: { "author" : "b"}, ops: { "$set" : { "post" : "zzzz"}}, multi: true, upsert: false
[DEBUG] 2017-11-02 09:21:00,520 [main] [org.mongodb.driver.protocol.update] - Updating documents in namespace PTMO_TEST.Blog on connection [connectionId{localValue:2, serverValue:7577}] to server 26.47.136.186:57004
[DEBUG] 2017-11-02 09:21:00,529 [main] [org.mongodb.driver.protocol.update] - Update completed
######第二次执行查询,查询出来的对象post仍然是1111111
[TRACE] 2017-11-02 09:21:00,531 [main] [org.mongodb.morphia.query.QueryImpl] - Running query(Blog) : { "author" : "b"}, fields:null,off:0,limit:1
[TRACE] 2017-11-02 09:21:00,531 [main] [org.mongodb.morphia.query.QueryImpl] - Getting cursor(Blog) for query:{ "author" : "b"}
[DEBUG] 2017-11-02 09:21:00,532 [main] [org.mongodb.driver.protocol.command] - Sending command {find : BsonString{value='Blog'}} to database PTMO_TEST on connection [connectionId{localValue:2, serverValue:7577}] to server 26.47.136.186:57004
Blog(_id=59fa6b4b934f0741cd03663e, author=b, post=1111111, comments=null)

就是这迷惑性的日志,让我觉得每次 q.get()就是向数据库进行查询操作。
我一开始以为是数据没有落实所以读到是老数据,于是修改update时 WriteConcern进行更新。

Test
1
datastore.update(ctQ,u, false, WriteConcern.JOURNALED);
  • JOURNALED: 确保数据到磁盘上的事务日志中才返回。
  • MAJORITY: 大多数数据节点确认后才返回。
  • ACKNOWLEDGED: 默认机制,服务器收到就返回。
  • UNACKNOWLEDGED: socket write后没报异常就返回。
  • W1/W2/W3: 1/2/3个成员确认后才返回。

然而仍然没什么用,不是数据落实问题。后来想到MongoDB中原子性的更新是 findAndModify ,于是改成findAndModify:

Test
1
2
3
4
// datastore.update(ctQ, u);
Blog b = datastore.findAndModify(ctQ, u);
System.out.println(b);

执行后确实拿到了最新的数据,虽然问题可以换成FAM解决但仍然想知道问题的原因,于是跟了一下get()的源码。

Query get的实现类 QueryImpl:

QueryImpl
1
2
3
4
5
6
7
8
@Override
public T get() {
final int oldLimit = limit;
limit = 1;
final Iterator<T> it = fetch().iterator();
limit = oldLimit;
return (it.hasNext()) ? it.next() : null;
}

实际上是fetch方法执行的查询

QueryImpl
1
2
3
4
5
6
7
8
9
@Override
public MorphiaIterator<T, T> fetch() {
final DBCursor cursor = prepareCursor();
if (LOG.isTraceEnabled()) {
LOG.trace("Getting cursor(" + dbColl.getName() + ") for query:" + cursor.getQuery());
}
return new MorphiaIterator<T, T>(ds, cursor, ds.getMapper(), clazz, dbColl.getName(), cache);
}

在prepareCursor()方法中,确实可以看到框架封装了DBObject以及DBCollection,并确确实实的调用了find方法进行查询:

QueryImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Prepares cursor for iteration
*
* @return the cursor
*/
public DBCursor prepareCursor() {
final DBObject query = getQueryObject();
final DBObject fields = getFieldsObject();
if (LOG.isTraceEnabled()) {
LOG.trace("Running query(" + dbColl.getName() + ") : " + query + ", fields:" + fields + ",off:" + offset + ",limit:" + limit);
}
final DBCursor cursor = dbColl.find(query, fields);
cursor.setDecoderFactory(ds.getDecoderFact());
...
}

但是返回的cursor封装到了MorphiaIterator里,并且传参跟随着一个cache。
MorphiaIterator也是Iterator的实现,其中封装next操作,在获取到DBOBject进行反序列化操作,具体反序列化操作调用的Mapper类中的方法:

Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Converts a DBObject back to a type-safe java object (POJO)
*
* @param <T> the type of the entity
* @param datastore the Datastore to use when fetching this reference
* @param dbObject the DBObject containing the document from mongodb
* @param entity the instance to populate
* @param cache the EntityCache to use
* @return the entity
*/
public <T> T fromDb(final Datastore datastore, final DBObject dbObject, final T entity, final EntityCache cache) {
//hack to bypass things and just read the value.
if (entity instanceof MappedField) {
readMappedField(datastore, (MappedField) entity, entity, cache, dbObject);
return entity;
}
// check the history key (a key is the namespace + id)
if (dbObject.containsField(ID_KEY) && getMappedClass(entity).getIdField() != null
&& getMappedClass(entity).getEntityAnnotation() != null) {
final Key<T> key = new Key(entity.getClass(), getCollectionName(entity.getClass()), dbObject.get(ID_KEY));
final T cachedInstance = cache.getEntity(key);
if (cachedInstance != null) {
return cachedInstance;
} else {
cache.putEntity(key, entity); // to avoid stackOverflow in recursive refs
}
}
final MappedClass mc = getMappedClass(entity);
final DBObject updated = mc.callLifecycleMethods(PreLoad.class, entity, dbObject, this);
try {
for (final MappedField mf : mc.getPersistenceFields()) {
readMappedField(datastore, mf, entity, cache, updated);
}
} catch (final MappingException e) {
Object id = dbObject.get(ID_KEY);
String entityName = entity.getClass().getName();
throw new MappingException(format("Could not map %s with ID: %s in database '%s'", entityName, id,
datastore.getDB().getName()), e);
}
if (updated.containsField(ID_KEY) && getMappedClass(entity).getIdField() != null) {
final Key key = new Key(entity.getClass(), getCollectionName(entity.getClass()), updated.get(ID_KEY));
cache.putEntity(key, entity);
}
mc.callLifecycleMethods(PostLoad.class, entity, updated, this);
return entity;
}

可以看到22行-28行是有缓存操作的,缓存的key是集合名和_id值,也就是说虽然第二个get确实执行了查询操作,但由于_id一样所以Morphia直接获取了cachedInstance操作,导致拿到的是第一次的缓存结果。
根据cache的注释来看,在同一个query对象的查询返回的都是缓存。
所以后续的查询其实有一部分是为了获取_id值用于拼cache key的,如果命中就取缓存。
比较坑的是如果有多次查询,并且查询结果比较大,后续的查询其实是浪费,这种缓存机制应该交给开发者进行控制,还能省一些查询。