在Linux下正确的使用Java锁

最近同事在linux上部署一个开源项目,为了防止这个项目生成的文件被外部其他程序修改/删除掉,他们稍微修改了下源代码在文件生成后给文件加锁,在他们开发机(Windows)上测试后没问题,就部署到服务器(CentOS)上了,结果不行,锁不住,生成的文件可以被第三方程序修改删除掉。
于是叫我帮忙调一下,因为我开发机用Linux Mint。

看现象十之八九就是 建议锁(ADVISORY)的问题,所以也懒的看。

猜测他们源码大体是这么写的:

1
2
3
4
5
6
7
8
9
10
FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
try(FileLock lock = channel.tryLock()) {
if (lock == null) {
log.debug("lock on file:{}", file.getPath());
// do something
// 花了很多时间
} else {
}
}

首先是结论:
代码没问题,不需改动所以不需要调什么。

Linux文件锁的类型有两种:

  • 1.mandatory 强制锁,加上后第三方程序对加过锁的文件修改会报错:文件被占用、文件无法被修改等。这个是排它的,一般有这种需求的都想要这种锁。
  • 2.advisory 建议锁/劝告锁,加上后第三方程序(gedite、vim/vi等、touch、rm等)可以直接修改删除。这个是 建议 的,也就是说它确实提供了加锁和检测是否有锁的手段,但假设你的第三方程序根本不检测文件有没有锁就直接修改/删除了,它也不 排斥,所以只对那些修改前try一下的守规矩程序有效。

再看一下JDK里FileLock的注释:

1
2
3
4
5
6
7
8
9
10
11
12
Whether or not a lock actually prevents another program from accessing
the content of the locked region is system-dependent and therefore
unspecified. The native file-locking facilities of some systems are merely
advisory, meaning that programs must cooperatively observe a known
locking protocol in order to guarantee data integrity. On other systems
native file locks are mandatory, meaning that if one program locks a
region of a file then other programs are actually prevented from accessing
that region in a way that would violate the lock. On yet other systems,
whether native file locks are advisory or mandatory is configurable on a
per-file basis. To ensure consistent and correct behavior across platforms,
it is strongly recommended that the locks provided by this API be used as if
they were advisory locks.

大意为是否能阻止另外的程序访问依赖于操作系统,有些操作系统下是advisory,有些则是mandatory。

一般来说绝大部分发行版(Ubuntu、CentOS、Debian)等默认获取的都是advisory锁,可以使用命令:

1
cat /proc/locks

看下当前系统所有已有的锁的类型。

如果想在Linux下mandatory锁,需要在操作系统挂载文件系统时激活才可以。

1
mount -o mand /dev/sdb7 /mnt

使用Ansible2.0部署JDK8

最近需要在多台新服务器部署Java环境,想到要重复同样的事这么多次就头疼,后来突然想到运维神器Ansible可以自动化部署,并且仅依赖ssh,只要在一台服务器上安装Ansible就可以,所以就想用Ansible来解决,因为毕竟偏运维所以也不想深究什么原理架构之类的,还是以解决实际问题为主,写死路径,丑陋什么的都无所谓。

所以没跟着官网文档做,直接在网上找了一篇入门教程就干了起来。
根据网上的教程添加个hosts文件用于管理服务器ip与提取公共变量。

hosts
1
2
3
4
5
6
[local]
192.168.100.129
[dw]
192.168.100.130
192.168.100.131
192.168.100.132

然后开始写yml格式playbook。

main.yml
1
2
3
4
5
6
7
8
9
10
11
12
- name: mkdir
shell: mkdir -p /root/java/
- name: copy jdk to remote host
copy: src=jdk-8u161-linux-x64.tar.gz dest=/root/java/
- name: unzip jdk
shell: tar -zxf /root/java/jdk-8u161-linux-x64.tar.gz -C /root/java/
- name: set jdk_env copy use template
template: src=java_home.sh.j2 dest=/root/java/set_jdk.sh
- name: execute script to set jdkenv
shell: sh /root/java/set_jdk.sh
- name: source bash_profile
shell: source /root/.bash_profile

  • 在root新建文件夹。
  • 拷贝jdk到新建的文件夹。
  • 解压jdk。
  • 拷贝设置环境变量的脚本到新建的文件夹下。
  • 执行设置环境变量的脚本。
  • 使刚才设置的环境变量生效。

看起来没什么问题,于是直接执行。

1
ansible-playbook playbook/roles/java/tasks/main.yml

直接报错:

1
2
3
4
5
6
7
The error appears to have been in '/etc/ansible/playbook/roles/java/tasks/main.yml': line 1, column 1, but may
be elsewhere in the file depending on the exact syntax problem.
The offending line appears to be:
- name: mkdir
^ here

看错误是解析yml失败了,猜测可能是空格或tab的原因导致,查看了下yml官方要求是用一个空格,但脚本里也都是空格没有tab。

没办法最后只能翻官方文档,看了官方的例子发现playbook的脚本格式与网上的教程都不一样,所以当时想会不会是因为用的Ansible2.0版本格式规则全变了,又不向下兼容导致。
所以写了个新建文件的小例子测试下。果然,2.0的格式与低版本的不一样,并且不向下兼容老本的playbook。

完整正确的Ansible2.0部署JDK的例子是:

  • 1.约定优于配置的情况下,官方建议的playbook、文件等存放的目录结构应该是:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /etc/ansible/
    ├── hosts
    └── playbook
    └── roles
    └── java
    └── tasks
    ├── files
    │   └── jdk-8u161-linux-x64.tar.gz
    ├── main.yml
    └── templates
    └── java_home.sh.j2
  • 2.playbook文件格式:

    main.yml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    - hosts: dw
    tasks:
    - name: mkdir
    shell: mkdir -p /root/java/
    - name: copy jdk to remote host
    copy: src=jdk-8u161-linux-x64.tar.gz dest=/root/java/
    - name: unzip jdk
    shell: tar -zxf /root/java/jdk-8u161-linux-x64.tar.gz -C /root/java/
    - name: set jdk_env copy use template
    template: src=java_home.sh.j2 dest=/root/java/set_jdk.sh
    - name: execute script to set jdkenv
    shell: sh /root/java/set_jdk.sh
    - name: source bash_profile
    shell: source /root/.bash_profile

增加了hosts,用于指定部署的服务器组,并且原本独立的命令移动到tasks下。

  • 3.设置环境变量的脚本:

    java_home.sh.j2
    1
    2
    3
    4
    5
    #!/bin/bash
    echo 'export JAVA_HOME=/root/java/jdk1.8.0_161' >> /root/.bash_profile
    echo 'export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH' >> /root/.bash_profile
    echo 'export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar' >> /root/.bash_profile
    source ~/.bash_profile
  • 最后执行命令:

    1
    ansible-playbook playbook/roles/java/tasks/main.yml

控制台输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
PLAY [dw] ***********************************************************************************************************************************************************************************************************
TASK [Gathering Facts] **********************************************************************************************************************************************************************************************
ok: [192.168.100.131]
ok: [192.168.100.132]
ok: [192.168.100.130]
TASK [mkdir] ********************************************************************************************************************************************************************************************************
[WARNING]: Consider using the file module with state=directory rather than running mkdir. If you need to use command because file is insufficient you can add warn=False to this command task or set
command_warnings=False in ansible.cfg to get rid of this message.
changed: [192.168.100.130]
changed: [192.168.100.132]
changed: [192.168.100.131]
TASK [copy jdk to remote host] **************************************************************************************************************************************************************************************
changed: [192.168.100.132]
changed: [192.168.100.130]
changed: [192.168.100.131]
TASK [unzip jdk] ****************************************************************************************************************************************************************************************************
[WARNING]: Consider using the unarchive module rather than running tar. If you need to use command because unarchive is insufficient you can add warn=False to this command task or set command_warnings=False in
ansible.cfg to get rid of this message.
changed: [192.168.100.130]
changed: [192.168.100.131]
changed: [192.168.100.132]
TASK [set jdk_env copy use template] ********************************************************************************************************************************************************************************
changed: [192.168.100.130]
changed: [192.168.100.131]
changed: [192.168.100.132]
TASK [execute script to set jdkenv] *********************************************************************************************************************************************************************************
changed: [192.168.100.130]
changed: [192.168.100.131]
changed: [192.168.100.132]
TASK [source bash_profile] ******************************************************************************************************************************************************************************************
changed: [192.168.100.130]
changed: [192.168.100.131]
changed: [192.168.100.132]
PLAY RECAP **********************************************************************************************************************************************************************************************************
192.168.100.130 : ok=7 changed=6 unreachable=0 failed=0
192.168.100.131 : ok=7 changed=6 unreachable=0 failed=0
192.168.100.132 : ok=7 changed=6 unreachable=0 failed=0

执行成功,部署全部正确完成。

Spark踩坑3-共享Scala集合类

最近为了更好的了解Spark的细节,改用Spark的亲儿子Scala进行开发,得益于框架API的统一,Spark方面没什么需要重新学习的,开发完就想着按照以前的经验优化一下。
记得官方Tuning Spark中,第一件要干的事就是调整序列化类库,Spark默认使用的Java serialization的序列化方式。官方建议使用Kryo类库进行序列化可以获得更好的压缩率和传输效率。
切换的方式很简单,只要在配置时指定序列化方式为:org.apache.spark.serializer.KryoSerializer即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
object Recommender {
private val log = Logger(Recommender.getClass)
// load config
private val config = ConfigFactory.load("config.properties")
// spark config
private val appName = config.getString("appName")
private val mode = config.getString("mode")
def main(args: Array[String]): Unit = {
log.info("init {},mode:{}", appName, mode)
// init spark
val sc = new SparkConf()
.setAppName(appName)
.setMaster(mode)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val session = SparkSession.builder().config(sc).getOrCreate()
try {
initMat(session)
var headMap = new mutable.LinkedHashMap[String, Int]()
headMap.put("ID", 0)
headMap.put("UID", 1)
val headBroadcast = ss.sparkContext.broadcast[mutable.LinkedHashMap[String, Int]](headMap)
// 略...
} finally {
session.close()
}
}
}

但是当我切换序列化方式后,我发现我要广播的一个LinkedHashMap每次获取都是空的。
于是猜测Kryo框架是不是没办法序列化Scala的集合类型,因为大部分的序列化框架对集合类都有或多或少的一些限制,比如Protostuff等。
于是写个测试代码测试一下:

1
2
3
4
5
6
7
8
9
10
def main(args: Array[String]): Unit = {
var headMap = new mutable.LinkedHashMap[String, Int]()
headMap.put("a", 1)
headMap.put("b", 2)
headMap.put("c", 3)
//
val kryo = new Kryo()
val output = new Output(new FileOutputStream("e:\\ser.bin"))
kryo.writeObject(output, headMap)
}

最终写到磁盘ser.bin的才1个字节,都不用反序列化,肯定不对。
查了一些官方文档,有个相对简单点的解决办法:

1
kryo.register(classOf[mutable.HashMap[String, (String, String)]], new JavaSerializer)

将原生类做为自定义的类注册下,并指定序列化方式仍然使用jdk的。。。
虽然问题能解决但这其实已经背离本意了,好在一点是mutable.HashMap使用jdk序列化,其他的类使用kryo,相比较以前全部用jdk效率确实高点。

Spark踩坑2—Lambda表达式序列化异常

这几天用Spark写一些正式的功能又踩到坑了,当submit程序到yarn以client模式运行时出现:

1
cannot assign instance of java.lang.invoke.SerializedLambda to field

异常显示序列化lambda异常。。。。
诡吊的一点在于cluster模式没有问题。
关于client模式和cluster模式网上有详细的解释,简单来说:

  • cluster模式下ResourceManger会在集群中的某台NodeManger服务器上启动一个ApplicationMaster,也就是Driver的容器,然后将ApplicationMaster分配给这台NodeManger。
  • client模式下ResourceManger会在本地启动ApplicationMaster并将其分配给本地的NodeManger。

由于用的JDK8所有程序中确实有大量的Lambda,一直觉得Lambda是匿名内部类的语法糖而已,运行时该擦除擦除,看到lang包下有个专门的SerializedLambda就知道不仅仅是个语法糖而已。
根据stackoverflow上的一解答来看:

Implementors of serializable lambdas, such as compilers or language runtime libraries, are expected to ensure that instances deserialize properly. One means to do so is to ensure that the writeReplace method returns an instance of SerializedLambda, rather than allowing default serialization to proceed.
当编译器或一个运行时的类库序列化一个lambad时,先要确保实例正确的反序列化,其中一个方法是确保writeReplace方法返回的是一个SerializedLambda实例,而不是用默认的序列化去处理。

SerializedLambda has a readResolve method that looks for a (possibly private) static method called $deserializeLambda$(SerializedLambda) in the capturing class, invokes that with itself as the first argument, and returns the result. Lambda classes implementing $deserializeLambda$ are responsible for validating that the properties of the SerializedLambda are consistent with a lambda actually captured by that class.
当反序列化时候SerializedLambda有一个readResolve方法,它会在capturing class(个人理解为调用lambda的那个类)查找一个可能私有的静态方法$deserializeLambda$,并将自己作为参数进行调用,实现$deserializeLambda$ 方法负责验证SerializedLambda的属性是否与调用的lambda一致。

所以说如果没有定义$deserializeLambda$这个方法,反序列化仍然会调用这个方法进行验证。
如果要解决这个问题有两种方法。

  • 1.不写lambda,换成匿名内部类。
  • 2.将包含lambda的类单独打包,使用Spark的setJar方法提交这个jar。代码:
    1
    2
    3
    SparkConf sconf = new SparkConf()
    .setJars(new String[]{"/root/lambda.jar"})
    .setMaster("yarn");

但仍然搞不懂的是为何cluster模式下没这问题,猜想是因为client模式下代码不进行完整的分发,所以包含lambda的代码无法在其他服务器上正确的序列化和反序列化,而cluster模式下都是整个打包分发到集群中其他的NodeManager中执行,所以没问题?
因为setJar方法的注释说明:传入的jar包会被分发到集群中的其他work中。

Spark踩坑1—无法初始化Main

上周用Spark开发了一个小程序,打包好submit到yarn后slave却一直报错:

1
java.lang.NoClassDefFoundError: Could not initialize class Main

代码:

Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author liuxinsi
* @mail akalxs@gmail.com
*/
public class Main {
private static final JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName(“Loader”).setMaster(“yarn”)
);
private static final SQLContext sqlContext = new SQLContext(sc);
public static void main(String[]() args) {
DataFrame df = sqlContext.read().json("data.json");
// 一些基本操作,略...
}
}

但开发时在local环境下则一点问题也没有。
一开始怀疑yarn环境里有重复的fatjar导致两个包的Main冲突,后来检查了一遍没问题,yarn本身也是隔离的。
google了一圈基本上都是说序列化或环境的问题。
仔细想了一下当submit到yarn的流程是把Spark的环境和提交的jar打了一个zip包放到HDFS上进行分发,而Spark程序运行时仅在数据混洗shuffle或其他传输情况才会进行序列化,而且无论是jdk的序列化还是kryo的序列化,如果对象不能序列化也是报序列化的异常,不可能连入口类都没初始化就跑到序列化的步骤去了。

环境问题的话也在加了-verbose情况下仔细检查了加载的jar包和顺序,也没发现什么问题。
最后实在没辙只能不停翻官方提供的例子看看有没有什么头绪,找了半天最后突然发现唯一一点不同就是官方的小demo里的所有SparkContext的初始都是放在函数里,也就是都是局部变量,而我一直都是写成全局变量。所以猜想会不会是这个原因导致的,后来把代码改成:

Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author liuxinsi
* @mail akalxs@gmail.com
*/
public class Main {
public static void main(String[]() args) {
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName(“Loader”).setMaster(“yarn”)
);
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().json("data.json");
// 一些基本操作,略...
}
}

打包后提交果然解决,虽然Spark的亲儿子是Scala这种函数式编程语言,但怎么也想不明白一个全局变量为什么会没办法初始化。
还有坑人的一点是本地local[*]模式下也测不出这种问题。

Gson反序列化包含泛型的对象

最近在重构一个遗留项目,项目中关于Json序列化/反序列化操作几乎都是显示的手动序列化,代码散落在各个方法中,丝毫没有利用框架的自动序列化机制。
重构的第一步是去除冗余和提炼共通方法。所以将序列化与反序列提取出来。
但提取反序列化带泛型的对象出了问题。
由于泛型在编译时会被擦除的特性,一般情况下要反序列化包含泛型的对象时需要用到Gson的TypeToken用于返回泛型类。举例:

Response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 响应对象。<br/>
* {@link #errMesg}和{@link #errDetail}仅在状态{@link #status}为{@link Status#ERROR}时出现。
*
* @author liuxinsi
* @mail akalxs@gmail.com
*/
@Data
public class Response<T> {
/**
* 业务数据
*/
private T data;
/**
* 响应状态
*/
private Status status;
/**
* 错误消息
*/
private String errMesg;
/**
* 错误堆栈
*/
private String errDetail;
public enum Status {
SUCCESS, ERROR;
}
}
ValidateTokenResponse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 对应{@link com.lxs.mms.auth.resource.AuthenticationResource#validateToken(ValidateTokenRequest)} 响应。
*
* @author liuxinsi
* @mail akalxs@gmail.com
*/
@Data
@ApiModel(value = "验证Token响应")
public class ValidateTokenResponse {
@ApiModelProperty(value = "Token是否合法,true合法,false非法。")
private Boolean legally;
@ApiModelProperty(value = "Token中的Audience属性")
private String audience;
@ApiModelProperty(value = "描述")
private String desc;
}

包装类Response用于抽象共通的响应,其中data是各个业务的对象,所以是个泛型。
ValidateTokenResponse,具体的业务响应。封装时候将对象Set到Response后进行序列化。

反序列化时由于泛型被擦,如果直接用fromJson方法则data的类型不会是期望的业务类,而是List<LinkedHashMap>。
所以Gson提供了TypeToken用于处理这个问题:

1
2
3
4
Gson gson = new Gson();
Type jsonType = new TypeToken<Response<ValidateTokenResponse>>() {
}.getType();
Response<ValidateTokenResponse> r=gson.fromJson(json, jsonType);

由于TypeToken的构造是protected的,所以需要new一个匿名子类(anonymous subclass),在构造的时候TypeToken会根据你显示传入的泛型获取泛型类并作为ParameterizedType返回。这样在fromJson的时候就可以知道具体的泛型类,然后反序列化时就可以得到正确的data类型。

所以在提取这种代码时候进行的封装则按常理应该是:

1
2
3
4
5
6
public static <T> Response<T> unwrap(String json) {
Gson gson = new Gson();
Type jsonType = new TypeToken<Response<T>>() {
}.getType();
return gson.fromJson(json, jsonType);
}

看起来毫无破绽,编译运行都没有问题,但就是data属性还是List
因为上面的T是一个类型参数(java.lang.reflect.TypeVariable)并不是具体的业务类。所以TypeToken没法取得期待的泛型类。
根据TypeToken的源码来看它在获取泛型类的时候实际上返回了一个ParameterizedType用于代表泛型类。
所以按照思路只要实现这个接口然后告诉它正确的泛型类就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static <T> Response<T> unwrap(String json, Class<?> clazz) {
return gson.fromJson(json, new ParameterizedType() {
/**
* 原始类型实际的泛型类,与外部传入<code>clazz</code>显示指明。
*/
@Override
public Type[] getActualTypeArguments() {
return new Class[]{clazz};
}
/**
* 原始类型。
*/
@Override
public Type getRawType() {
return Response.class;
}
/**
* 如果是内部类需要指明所属的对象,如果不是返回null。
*/
@Override
public Type getOwnerType() {
return null;
}
});
}

调用时:

1
Response<ValidateTokenResponse> r = ResponseUnwrap.unwrap(json,ValidateTokenResponse.class);

就可以正确的处理反序列化。
其实还可以封装的更灵活点,比如处理多个泛型需要修改传参和getActualTypeArguments方法的返回,如果是传集合类型还要单独处理下,不应该写死RawType。

MongoDB ORM框架Morphia缓存小坑

最近在用Morphia更新MongoDB数据时发现更新完后没有拿到更新后的新数据,一开始没有在意以为MongDB复制集读写分离后拿的从服务器数据,从服务器还没有同步到更新。后来一想不太对,复制集的op同步的间隔很短,更新和查询又是在一个上下文中,而且当时读写相当空闲,虽然NOSQL都是保证最终一致性,但如果这么简单的一个操作都没办法保证原子性那也太搓了,而且用MongoDB这么多年也没发现类似的问题。

重现代码:

Test
1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void test() {
Query<Blog> q = datastore.createQuery(Blog.class).field("author").equal("b");
System.out.println(q.get());
Query<Blog> ctQ = datastore.createQuery(Blog.class);
ctQ.field("author").equal("b");
UpdateOperations<Blog> u = datastore.createUpdateOperations(Blog.class);
u.set("post", "zzzz");
datastore.update(ctQ, u);
System.out.println(q.get());
}

执行一下查询,然后在更新,在查询。根据日志显示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#####第一次执行查询,post是1111111
Creating MappedClass for class mongo.service.Blog
[TRACE] 2017-11-02 09:21:00,506 [main] [org.mongodb.morphia.query.QueryImpl] - Running query(Blog) : { "author" : "b"}, fields:null,off:0,limit:1
[TRACE] 2017-11-02 09:21:00,506 [main] [org.mongodb.morphia.query.QueryImpl] - Getting cursor(Blog) for query:{ "author" : "b"}
[DEBUG] 2017-11-02 09:21:00,506 [main] [org.mongodb.driver.protocol.command] - Sending command {find : BsonString{value='Blog'}} to database PTMO_TEST on connection [connectionId{localValue:2, serverValue:7577}] to server 26.47.136.186:57004
[DEBUG] 2017-11-02 09:21:00,508 [main] [org.mongodb.driver.protocol.command] - Command execution completed
Blog(_id=59fa6b4b934f0741cd03663e, author=b, post=1111111, comments=null)
####### 执行更新,更新post为zzzz
[TRACE] 2017-11-02 09:21:00,513 [main] [org.mongodb.morphia.DatastoreImpl] - Executing update(Blog) for query: { "author" : "b"}, ops: { "$set" : { "post" : "zzzz"}}, multi: true, upsert: false
[DEBUG] 2017-11-02 09:21:00,520 [main] [org.mongodb.driver.protocol.update] - Updating documents in namespace PTMO_TEST.Blog on connection [connectionId{localValue:2, serverValue:7577}] to server 26.47.136.186:57004
[DEBUG] 2017-11-02 09:21:00,529 [main] [org.mongodb.driver.protocol.update] - Update completed
######第二次执行查询,查询出来的对象post仍然是1111111
[TRACE] 2017-11-02 09:21:00,531 [main] [org.mongodb.morphia.query.QueryImpl] - Running query(Blog) : { "author" : "b"}, fields:null,off:0,limit:1
[TRACE] 2017-11-02 09:21:00,531 [main] [org.mongodb.morphia.query.QueryImpl] - Getting cursor(Blog) for query:{ "author" : "b"}
[DEBUG] 2017-11-02 09:21:00,532 [main] [org.mongodb.driver.protocol.command] - Sending command {find : BsonString{value='Blog'}} to database PTMO_TEST on connection [connectionId{localValue:2, serverValue:7577}] to server 26.47.136.186:57004
Blog(_id=59fa6b4b934f0741cd03663e, author=b, post=1111111, comments=null)

就是这迷惑性的日志,让我觉得每次 q.get()就是向数据库进行查询操作。
我一开始以为是数据没有落实所以读到是老数据,于是修改update时 WriteConcern进行更新。

Test
1
datastore.update(ctQ,u, false, WriteConcern.JOURNALED);
  • JOURNALED: 确保数据到磁盘上的事务日志中才返回。
  • MAJORITY: 大多数数据节点确认后才返回。
  • ACKNOWLEDGED: 默认机制,服务器收到就返回。
  • UNACKNOWLEDGED: socket write后没报异常就返回。
  • W1/W2/W3: 1/2/3个成员确认后才返回。

然而仍然没什么用,不是数据落实问题。后来想到MongoDB中原子性的更新是 findAndModify ,于是改成findAndModify:

Test
1
2
3
4
// datastore.update(ctQ, u);
Blog b = datastore.findAndModify(ctQ, u);
System.out.println(b);

执行后确实拿到了最新的数据,虽然问题可以换成FAM解决但仍然想知道问题的原因,于是跟了一下get()的源码。

Query get的实现类 QueryImpl:

QueryImpl
1
2
3
4
5
6
7
8
@Override
public T get() {
final int oldLimit = limit;
limit = 1;
final Iterator<T> it = fetch().iterator();
limit = oldLimit;
return (it.hasNext()) ? it.next() : null;
}

实际上是fetch方法执行的查询

QueryImpl
1
2
3
4
5
6
7
8
9
@Override
public MorphiaIterator<T, T> fetch() {
final DBCursor cursor = prepareCursor();
if (LOG.isTraceEnabled()) {
LOG.trace("Getting cursor(" + dbColl.getName() + ") for query:" + cursor.getQuery());
}
return new MorphiaIterator<T, T>(ds, cursor, ds.getMapper(), clazz, dbColl.getName(), cache);
}

在prepareCursor()方法中,确实可以看到框架封装了DBObject以及DBCollection,并确确实实的调用了find方法进行查询:

QueryImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Prepares cursor for iteration
*
* @return the cursor
*/
public DBCursor prepareCursor() {
final DBObject query = getQueryObject();
final DBObject fields = getFieldsObject();
if (LOG.isTraceEnabled()) {
LOG.trace("Running query(" + dbColl.getName() + ") : " + query + ", fields:" + fields + ",off:" + offset + ",limit:" + limit);
}
final DBCursor cursor = dbColl.find(query, fields);
cursor.setDecoderFactory(ds.getDecoderFact());
...
}

但是返回的cursor封装到了MorphiaIterator里,并且传参跟随着一个cache。
MorphiaIterator也是Iterator的实现,其中封装next操作,在获取到DBOBject进行反序列化操作,具体反序列化操作调用的Mapper类中的方法:

Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Converts a DBObject back to a type-safe java object (POJO)
*
* @param <T> the type of the entity
* @param datastore the Datastore to use when fetching this reference
* @param dbObject the DBObject containing the document from mongodb
* @param entity the instance to populate
* @param cache the EntityCache to use
* @return the entity
*/
public <T> T fromDb(final Datastore datastore, final DBObject dbObject, final T entity, final EntityCache cache) {
//hack to bypass things and just read the value.
if (entity instanceof MappedField) {
readMappedField(datastore, (MappedField) entity, entity, cache, dbObject);
return entity;
}
// check the history key (a key is the namespace + id)
if (dbObject.containsField(ID_KEY) && getMappedClass(entity).getIdField() != null
&& getMappedClass(entity).getEntityAnnotation() != null) {
final Key<T> key = new Key(entity.getClass(), getCollectionName(entity.getClass()), dbObject.get(ID_KEY));
final T cachedInstance = cache.getEntity(key);
if (cachedInstance != null) {
return cachedInstance;
} else {
cache.putEntity(key, entity); // to avoid stackOverflow in recursive refs
}
}
final MappedClass mc = getMappedClass(entity);
final DBObject updated = mc.callLifecycleMethods(PreLoad.class, entity, dbObject, this);
try {
for (final MappedField mf : mc.getPersistenceFields()) {
readMappedField(datastore, mf, entity, cache, updated);
}
} catch (final MappingException e) {
Object id = dbObject.get(ID_KEY);
String entityName = entity.getClass().getName();
throw new MappingException(format("Could not map %s with ID: %s in database '%s'", entityName, id,
datastore.getDB().getName()), e);
}
if (updated.containsField(ID_KEY) && getMappedClass(entity).getIdField() != null) {
final Key key = new Key(entity.getClass(), getCollectionName(entity.getClass()), updated.get(ID_KEY));
cache.putEntity(key, entity);
}
mc.callLifecycleMethods(PostLoad.class, entity, updated, this);
return entity;
}

可以看到22行-28行是有缓存操作的,缓存的key是集合名和_id值,也就是说虽然第二个get确实执行了查询操作,但由于_id一样所以Morphia直接获取了cachedInstance操作,导致拿到的是第一次的缓存结果。
根据cache的注释来看,在同一个query对象的查询返回的都是缓存。
所以后续的查询其实有一部分是为了获取_id值用于拼cache key的,如果命中就取缓存。
比较坑的是如果有多次查询,并且查询结果比较大,后续的查询其实是浪费,这种缓存机制应该交给开发者进行控制,还能省一些查询。

Tomcat在部署应用时卡住

最新在一台新的CentOS上部署环境,当部署完Tomcat8准备启起来看看效果时发现一直阻塞在部署应用这条日志下。

1
Deploying web application directory /tomcat/webapps/live

记得新下载的tomcat解压后直接启动一点问题没有,但为了安全清空了webapps下的tomcat ROOT和Manage应用后,并把我的应用传上来在启动就卡住了,一直无解。
后来google到了一篇文章

1
2
3
4
5
6
7
8
9
One thing I noticed on one of my first upgrades is that TC7 can often
take a long time to start up due to slow initialization of the
SessionIdGenerator - it can take up to nearly 2 minutes! It appears
to take longer if I restart TC7 quickly which seems to confirm that a
lack of entropy is the issue.
org.apache.catalina.util.SessionIdGenerator-: Creation of SecureRandom
instance for session ID generation using [SHA1PRNG] took [105,014]
milliseconds.

大体意思为Tomcat7+依赖 SecureRandom 提供的随机数给分布式Session做ID,取决于JRE,可能会导致Tomcat启动的延迟,上面这个人大概等了2分钟左右。

有两种解决方法.
1.修改Tomcat catalina.sh配置,设置非阻塞的SecureRandom初始化。

1
-Djava.security.egd=file:/dev/./urandom

2.修改JVM配置$JAVA_PATH/jre/lib/security/java.security。

1
securerandom.source=file:/dev/urandom

替换成

1
securerandom.source=file:/dev/./urandom

修改后,启动时间恢复正常。

利用NFS进行Linux之间的目录共享

最近需要在linux下共享个目录到另外一台linux下用用,如果有桌面环境用鼠标点点就可以轻松配个samba出来,但服务器版则要改一堆配置才行,太麻烦。后来突然想到如果只是linux之间的共享不涉及到windows其实用NFS就可以,还简单一点。

NFS简单描述:

NFS是一个sun创建的网络协议,NFS运行在一个区域网直接进行文件共享。etc…

1.修改/etc/exports文件。

exports文件用于暴露需要共享的文件给NFS客户端,是一个访问控制列表。

2.添加要共享的目录到文件。

1
2
vi /etc/exports
/home/xxx/ttt/ 26.47.136.*(rw,sync,no_root_squash)

配置以空格分割。

  • /home/xxx/ttt/ 指要共享的目录。
  • 26.47.136.* 术语叫:Machine Name Formats,理解为客户端的授权方式,这里配置的允许客户端来自26.47这个网段,还支持域名、nis netgroups等方式。
  • 括号里的是配置项
    • rw 允许读写
    • sync 确保文件存储到磁盘后才响应客户端,与之对应的是async。
    • no_root_squash 使用者如果是root,那么对于这个目录他就有root权限。一般正式环境不建议用这个选项,不安全,与之对应的是root_squash。

保存修改后,NFS服务端的配置已经完成。运行脚本启动NFS服务:

1
2
/etc/init.d/portmap start
/etc/init.d/nfs start

3.另外一台服务器做为客户端挂载共享目录。

1
2
mount -t nfs 26.47.136.19:/home/xxx/ttt /mnt/xxx
ls /mnt/xxx

挂载类型使用nfs,就可以将共享目录挂载到/mnt/xxx下。

出现的问题

1
failed: RPC Error: Program not registered

使用命令查看NFS服务端的服务器进程:

1
ps aux|grep rpc

正常应该有两个进程 rpc.statd 和 rpc.mountd,如果没有启动则手动启起来,一般在/usr/sbin下。

1
Access deny

检查下iptables,hosts.allow等配置。

在基于Spring-Boot的Jersey中集成Swagger

最近一时兴起,想将最近几年工作中用到的微服务栈,提干,优化一下作为Seed项目记录一下,省着忘了。
my-microservice-seed

本来想引入Swagger来作为Api描述与设计,以前在一个基于Spring REST开发的一个小后台项目中简单的初探过,配置起来很方便。
只要引入io.springfox的两个依赖,在提供一个配置类,配置类中在指定下下RestControlle所有在的包既可通过swagger-ui.html看到生成的描述。like this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* @author liuxinsi
* @mail akalxs@gmail.com
*/
@Configuration
@EnableSwagger2
public class Swagger {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.lxs.mms.rs.resource"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("MMS")
.description("my microservice seed")
.version("1.0.0")
.contact(new Contact("liuxinsi", "https://liuxinsi.github.io", "akalxs@gmail.com"))
.build();
}
}

但这次试用的是Jersey,配置后没办法产生文档,根据日志看感觉配置的resource下都没扫到东西。最后在io.springfox与Swagger的github wiki上查了一下.

原来springfox提供的Swagger实现仅能作用于基于Spring MVC的REST实现,Jersey不行。。
想要用Swagger对Jersey生成的接口产生描述文档则需要:

  • 用Swagger-api提供的Swagger-core对Jersey的resource产生Swagger2的描述文件(类似WADL/WSDL)即swagger.json。
  • 在github上下载swagger-ui.
  • 用直接运行html或docker或nginx等运行swagger-ui。
  • 调整跨域策略或让swagger-ui与Jersey在同一域下。
  • 访问生成的swagger.json
  • all shit done.

乍一看颇为麻烦,但其实除了要手动处理下swagger-ui以外其他也还好,都是Swagger需要的步骤无非是自己做还是springfox帮你做好了。

完整代码

1
2
3
4
5
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-jersey2-jaxrs</artifactId>
<version>1.5.0</version>
</dependency>

引入swagger依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Swagger配置。
*
* @author liuxinsi
* @mail akalxs@gmail.com
*/
@Component
@Log4j2
public class Swagger {
@PostConstruct
public void initSwagger() {
log.debug("init Swagger ");
BeanConfig config = new BeanConfig();
config.setTitle("MMS");
config.setDescription("my microservice seed");
config.setVersion("1.0.0");
config.setContact("liuxinsi");
config.setSchemes(new String[]{"http", "https"});
config.setBasePath(JerseyConfig.APPLICATION_PATH);
config.setResourcePackage(JerseyConfig.RESOURCE_PACKAGE_NAME);
config.setPrettyPrint(true);
config.setScan(true);
}
}

配置整个服务的相关信息以及Resource所在包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.lxs.mms.rs.resource;
import io.swagger.jaxrs.listing.ApiListingResource;
import io.swagger.jaxrs.listing.SwaggerSerializers;
import org.glassfish.jersey.logging.LoggingFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.slf4j.bridge.SLF4JBridgeHandler;
import org.springframework.stereotype.Component;
import javax.ws.rs.ApplicationPath;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* jersey 相关配置。
*
* @author liuxinsi
* @mail akalxs@gmail.com
*/
@Component
@ApplicationPath(JerseyConfig.APPLICATION_PATH)
public class JerseyConfig extends ResourceConfig {
public static final String APPLICATION_PATH = "services";
public static final String RESOURCE_PACKAGE_NAME = "com.lxs.mms.rs.resource";
/**
* 覆盖jersey logging 自带的jul logger
*/
private static final Logger REQ_RESP_LOGGER = Logger.getLogger("payload-logger");
public JerseyConfig() {
// 移除根日志处理器
SLF4JBridgeHandler.removeHandlersForRootLogger();
// 绑定新的处理器
SLF4JBridgeHandler.install();
// 请求 响应日志
REQ_RESP_LOGGER.setLevel(Level.FINE);
LoggingFeature lf = new LoggingFeature(REQ_RESP_LOGGER);
register(lf);
// 配置Swagger
this.register(ApiListingResource.class);
this.register(SwaggerSerializers.class);
packages(RESOURCE_PACKAGE_NAME);
}
}

注册两个feature,一个用于生成api信息,一个用于编解码产生swagger.json。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.lxs.mms.rs.resource.user;
import com.lxs.mms.rs.resource.ResourceSupport;
import com.lxs.mms.rs.resource.bean.user.UserInfo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.stereotype.Component;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author liuxinsi
* @mail akalxs@gmail.com
*/
@Component
@Path("/user/v1")
@Api(value = "用户相关服务", produces = "application/json")
public class UserResource extends ResourceSupport {
@ApiOperation(value = "加载所有用户", notes = "要分页")
@GET
@Path("/loadUsers")
public List<UserInfo> loadUsers() {
List<UserInfo> userInfos = new ArrayList<>(10);
for (int i = 0; i < 100; i++) {
UserInfo u = new UserInfo();
u.setId(i + "");
u.setName("u" + i);
u.setPwd("");
u.setRegisteDate(new Date());
userInfos.add(u);
}
return userInfos;
}
}

Resource类,注意@Api注解。

这时可以先启动spring-boot,访问http://127.0.0.1:8888/services/swagger.json,如生成了swagger2的信息则配置都成功。
部署swagger-ui,我是将下载到的静态资源直接部署在spring-boot的resource/static中,让swagger-ui的请求与swagger在一个域下。
最后重新启动spring-boot访问swagger-ui界面。


all shit done。