深入浅出Rust Future – Part 5

原文:Rust futures: an uneducated, short and hopefully not boring tutorial - Part 5 - Streams

Intro

在上篇文章中我们学习了如何实现一个高效率的Future(尽量不阻塞, 只有在需要时才会Unpark我们的Task). 今天继续扩展我们的Future: 实现一个Stream Trait.
StreamIterators看起来很像: 他们随着时间的推移产生多个相同类型的输出, 与Iterators唯一的区别就是消费的方式不同. 让我们一起尝试使用Reactor来处理Streams吧.

ForEach combinator

我们使用一个名为for_each的组合器, 来代替我们手动迭代消费Stream. 查询文档不难发现future::stream实现了ForEach, 所以我们不仅可以迭代, 也可以把stream放入Reactor, 把它作为Future Chain的一部分. 这看起来简直太酷了.现在让我们一步一步来实现一个简单的Stream.

impl Stream

Stream TraitFuture Trait很像:

pub trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;

    // <-- CUT -->
}

pub trait Stream {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;

    // <-- CUT -->
}

这两个Trait都有很多的函数, 由于这些函数都有默认值, 因此如果你不需要它, 就无需实现他们. 在本篇文章里我们只关注poll这个方法.

    // Future
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;

    // Stream
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;

对比下FutureStream两者poll函数的区别:

Situation Future Stream
Item to return ready Ok(Async::Ready(t)) Ok(Async::Ready(Some(t)))
Item to return not ready Ok(Async::NotReady) Ok(Async::NotReady)
No more items to return N.A. Ok(Async::Ready(None))
Error Err(e) Err(e)

Simple stream

让我们一起实现一个简单的stream:

struct MyStream {
    current: u32,
    max: u32,
}

impl MyStream {
    pub fn new(max: u32) -> MyStream {
        MyStream {
            current: 0,
            max: max,
        }
    }
}

impl Stream for MyStream {
    type Item = u32;
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.current {
            ref mut x if *x < self.max => {
                *x = *x + 1;
                Ok(Async::Ready(Some(*x)))
            }
            _ => Ok(Async::Ready(None)),
        }
    }
}

我们重点关注下poll函数, 形参传递了一个可变引用, 所以我们可以改变MyStream内部的值. 这段代码理解起来很容易:

检查MyStream.current是否大于 MyStream.max 如果大于: 返回Ok(Async::Ready(None)), 否则MyStream.current自增1并且返回当前的值.

Consume a stream

let mut reactor = Core::new().unwrap();
let my_stream = MyStream::new(5);

let fut = my_stream.for_each(|num| {
    println!("num === {}", num);
    ok(())
});

注意ok(()), 这段代码意味着我们返回的是个Future, 所以我们不仅可以使用Reactor执行fut, 也可以跟别的Future, 组合成Future Chain.

Spawn futures during the event loop

我们在处理Stream时, 有时候想创建(spawn:派生)新的Future, 这样做理由有很多, 比如不想阻塞当前的Future Task, Rust 是允许我们使用Reactorexecute函数将创建的Future加入现有的事件循环中的. 然而这有一个陷阱: execute 返回的是Result<(), ExecuteError<f>>, 可以看出这个函数正常返回时,没有任何的值.


impl Stream for MyStream {
    type Item = u32;
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        use futures::future::Executor;

        match self.current {
            ref mut x if *x < self.max => {
                *x = *x + 1;

                self.core_handle.execute(WaitInAnotherThread::new(
                    Duration::seconds(2),
                    format!("WAIT {:?}", x),
                ));
                Ok(Async::Ready(Some(*x)))
            }
            _ => Ok(Async::Ready(None)),
        }
    }
}

这里需要关注的是execute这段代码, 它产生一个新的Future(等待两秒, 然后打印x), 不过请记住, 这个future将不会返回任何值(除了Error), 所以我们当且仅当把他是一个Daemon-like线程.

测试Code:


fn main() {
    let mut reactor = Core::new().unwrap();

    // create a Stream returning 5 items
    // Each item will spawn an "inner" future
    // into the same reactor loop
    let my_stream = MyStream::new(5, reactor.handle());

    // we use for_each to consume
    // the stream
    let fut = my_stream.for_each(|num| {
        println!("num === {:?}", num);
        ok(())
    });

    // this is a manual future. it's the same as the
    // future spawned into our stream
    let wait = WaitInAnotherThread::new(Duration::seconds(3), "Manual3".to_owned());

    // we join the futures to let them run concurrently
    let future_joined = fut.map_err(|err| {}).join(wait);

    // let's run the future
    let ret = reactor.run(future_joined).unwrap();
    println!("ret == {:?}", ret);
}

上端代码我们展示了如何连接StreamFuture. 现在让我尝试跑一下我们的代码:


num === 1
num === 2
num === 3
num === 4
num === 5
"Manual3" starting the secondary thread!
"Manual3" not ready yet! parking the task.
"WAIT 1" starting the secondary thread!
"WAIT 1" not ready yet! parking the task.
"WAIT 2" starting the secondary thread!
"WAIT 2" not ready yet! parking the task.
"WAIT 3" starting the secondary thread!
"WAIT 3" not ready yet! parking the task.
"WAIT 4" starting the secondary thread!
"WAIT 4" not ready yet! parking the task.
"WAIT 5" starting the secondary thread!
"WAIT 5" not ready yet! parking the task.
"WAIT 1" the time has come == 2017-12-06T10:23:30.853796527Z!
"WAIT 1" ready! the task will complete.
"WAIT 2" the time has come == 2017-12-06T10:23:30.853831227Z!
"WAIT 2" ready! the task will complete.
"WAIT 3" the time has come == 2017-12-06T10:23:30.853842927Z!
"WAIT 3" ready! the task will complete.
"WAIT 5" the time has come == 2017-12-06T10:23:30.853856927Z!
"WAIT 5" ready! the task will complete.
"WAIT 4" the time has come == 2017-12-06T10:23:30.853850427Z!
"WAIT 4" ready! the task will complete.
"Manual3" the time has come == 2017-12-06T10:23:31.853775627Z!
"Manual3" ready! the task will complete.
ret == ((), ())

这个结果不是唯一的, 你和我的输出也许有所不同, 如果我们没有派生等待3s的Future, 结果是否就会有所不同?


fn main() {
    let mut reactor = Core::new().unwrap();

    // create a Stream returning 5 items
    // Each item will spawn an "inner" future
    // into the same reactor loop
    let my_stream = MyStream::new(5, reactor.handle());

    // we use for_each to consume
    // the stream
    let fut = my_stream.for_each(|num| {
        println!("num === {:?}", num);
        ok(())
    });

    // let's run the future
    let ret = reactor.run(fut).unwrap();
    println!("ret == {:?}", ret);
}

我们会注意到这段代码几乎会立即返回下面的值:


num === 1
num === 2
num === 3
num === 4
num === 5
ret == ()

poll 函数里派生出的Future没有机会运行.

Next steps

下一篇我们将会使用await!()来精简我们的Future`.

深入浅出Rust Future – Part4 – A real future from scratch

\
译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 4 - A "real" future from scratch

Intro

上三篇文章我们阐述如何处理Future的基础知识, 我们现在能组织多个Future成为一个Future chain, 执行他们,甚至创建他们.但是到现在我们的Future还没有贴近我们日常的使用场景。(But, so far, our futures are not really delegating the execution to another thing.)
在Part-3中我们用了粗暴的方法来Unpark Future。虽然解决了问题,并且使Reactor变得相对高效,但是这不是最佳实践。今天就让我们换种更好的方式去实现Future

A timer future

我们可以创建一个最简单的Timer Future(就像我们在Part-3章节所做的那样). 但这一次,我们不会立即Unpark Future Task, 而是一直Parked, 直到这个Future准备完为止. 我们该怎样实现? 最简单的方式就是再委派一个线程。这个线程将等待一段时间, 然后Unpark我们的Future Task.

这就像在模拟一个AsyncIO的使用场景。当我们异步做的一些事情已经完成我们会收到与之相应的通知。为了简单起见,我们认为Reactor是单线程的,在等待通知的时候可以做其他的事情。

Timer revised

我们的结构体非常简单.他包含结束日期和任务是否在运行。

pub struct WaitInAnotherThread {
    end_time: DateTime<Utc>,
    running: bool,
}

impl WaitInAnotherThread {
    pub fn new(how_long: Duration) -> WaitInAnotherThread {
        WaitInAnotherThread {
            end_time: Utc::now() + how_long,
            running: false,
        }
    }
}

DateTime类型和Duration持续时间来自chronos crate.

Spin wait

实现等待时间的函数:

pub fn wait_spin(&self) {
    while Utc::now() < self.end_time {}
    println!("the time has come == {:?}!", self.end_time);
}

fn main() {
    let wiat = WaitInAnotherThread::new(Duration::seconds(30));
    println!("wait spin started");
    wiat.wait_spin();
    println!("wait spin completed");
}

在这种情况下,我们基本上会根据到期时间检查当前时间。 这很有效,而且非常精确。 这种方法的缺点是我们浪费了大量的CPU周期。 在我的电脑上, CPU一个核心完全被占用,这和我们Part-3遇到的情况一致。

Spin wait这种方式只适用于等待时间非常短的场景, 或者你没有别的选择的情况下使用它。

Sleep wait

系统通常会允许你的线程Park一段特定的时间.这通常被称为线程睡眠。睡眠线程X秒,换据换的意思是: 告诉系统X秒内,不需要调度我。这样的好处是,CPU可以在这段时间内干别的事情。在Rust中我们使用std::thread::sleep().

pub fn wait_blocking(&self) {
    while Utc::now() < self.end_time {
        let delta_sec = self.end_time.timestamp() - Utc::now().timestamp();
        if delta_sec > 0 {
            thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));
        }
    }
    println!("the time has come == {:?}!", self.end_time);
}

let wiat = WaitInAnotherThread::new(Duration::seconds(30));
println!("wait blocking started");
wiat.wait_blocking();
println!("wait blocking completed");

尝试运行我们的代码会发现, 改进过的代码再也不会完全占用一个CPU核心了。改进过的代码比我们该开始写的性能好多了,但是这就是Future了吗?

Future

当然不是,我们还没有实现Future Trait, 所以,我们现在实现它。

impl Future for WaitInAnotherThread {
    type Item = ();
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        while Utc::now() < self.end_time {
            let delta_sec = self.end_time.timestamp() - Utc::now().timestamp();
            if delta_sec > 0 {
                thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));
            }
        }
        println!("the time has come == {:?}!", self.end_time);
        Ok(Async::Ready(())
}

emmm,这块代码我们是不是以前见过, 跟上篇我们写的一个很像,它会阻塞Reactor,这样做实在是太糟糕了。

Future 应该尽可能的不要阻塞。

一个Reactor的最佳实践应该至少包含下面几条:

  • 当主Task需要等待别的Task时,应该停止它。
  • 不要阻塞当前线程。
  • 任务完成时向Reactor发送信号。

我们要做的是创建另一个睡眠线程. 睡眠的线程是不会占用CPU资源。所以在另一个线程里Reactor还像往常那样,高效的工作着。当这个Sleep Thread醒来后, 它会Unpark这个任务, 并且通知Reactor

让我们一步一步完善我们的想法:

impl Future for WaitInAnotherThread {
    type Item = ();
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        if Utc::now() < self.end_time {
            println!("not ready yet! parking the task.");

            if !self.running {
                println!("side thread not running! starting now!");
                self.run(task::current());
                self.running = true;
            }

            Ok(Async::NotReady)
        } else {
            println!("ready! the task will complete.");
            Ok(Async::Ready(()))
        }
    }
}

我们只需要创建一个并行线程, 所以我们需要有个字段来判断(WaitInAnotherThread.runing),当前需不需要创建这个线程。这里需要注意的是当Future被轮询之前,这些代码是不会被执行的。当然我们还会检测当前时间是否大于过期时间,如果大于,也不会产生另外一个线程。

如果end_time大于当前的时间并且另一个线程没有被创建,程序就会立即创建一个新的线程。然后程序会返回Ok(Async::NotReady()), 与我们Part-3中所做的相反,我们不会在这里Unpark Task. 这是另一个线程应该做的事情。在别的实现中,例如IO,唤醒我们的主线程的应该是操作系统。

fn run(&mut self, task: task::Task) {
    let lend = self.end_time;

    thread::spawn(move || {
        while Utc::now() < lend {
            let delta_sec = lend.timestamp() - Utc::now().timestamp();
            if delta_sec > 0 {
                thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));
            }
            task.notify();
        }
        println!("the time has come == {:?}!", lend);
    });
}

这里有两件事情需要注意下.

  • 我们将Task引用传递给,另一个并行的线程。这很重要,因为我们不能在单独的线程里使用task::current.

  • 我们不能将self移动到闭包中,所以我们需要转移所有权至lend变量.为啥这样做?

    Rust中的线程需要实现具有'static生命周期的Send Trait

task自身实现了上述的要求所以可以引用传递。但是我们的结构体没有实现,所以这也是为什么要移动end_time的所有权。这就意味着当线程被创建后你不能更改end_time.

让我们尝试运行下:

fn main() {
    let mut reactor = Core::new().unwrap();

    let wiat = WaitInAnotherThread::new(Duration::seconds(3));
    println!("wait future started");
    let ret = reactor.run(wiat).unwrap();
    println!("wait future completed. ret == {:?}", ret);
}

运行结果:

Finished dev [unoptimized + debuginfo] target(s) in 0.96 secs
Running `target/debug/tst_fut_complete`
wait future started
not ready yet! parking the task.
side thread not running! starting now!
the time has come == 2017-11-21T12:55:23.397862771Z!
ready! the task will complete.
wait future completed. ret == ()

让我们总结下流程:

  • 我们让Reactor执行我们的Future.

  • Future发现end_time大于当前时间:

    1. Park Task
    2. 开启另一个线程
  • 副线程在一段时间后被唤醒:

    1. 告诉Reactor我们的Task可以Unpark了。
    2. 销毁自身
  • Reactor唤醒被ParkTask

  • Future(Task)完成了自身的任务:

    1. 通知Reactor
    2. 返回相应的结果
  • reactor将任Task的输出值返回给run函数的调用者。

Code

extern crate chrono;
extern crate futures;

extern crate tokio_core;

use chrono::prelude::*;
use chrono::*;
use futures::prelude::*;
use futures::*;
use std::error::Error;
use std::thread::{sleep, spawn};
use tokio_core::reactor::Core;

pub struct WaitInAnotherThread {
    end_time: DateTime<Utc>,
    running: bool,
}

impl WaitInAnotherThread {
    pub fn new(how_long: Duration) -> WaitInAnotherThread {
        WaitInAnotherThread {
            end_time: Utc::now() + how_long,
            running: false,
        }
    }

    fn run(&mut self, task: task::Task) {
        let lend = self.end_time;

        spawn(move || {
            while Utc::now() < lend {
                let delta_sec = lend.timestamp() - Utc::now().timestamp();
                if delta_sec > 0 {
                    sleep(::std::time::Duration::from_secs(delta_sec as u64));
                }
                task.notify();
            }
            println!("the time has come == {:?}!", lend);
        });
    }
}

impl Future for WaitInAnotherThread {
    type Item = ();
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        if Utc::now() < self.end_time {
            println!("not ready yet! parking the task.");

            if !self.running {
                println!("side thread not running! starting now!");
                self.run(task::current());
                self.running = true;
            }

            Ok(Async::NotReady)
        } else {
            println!("ready! the task will complete.");
            Ok(Async::Ready(()))
        }
    }
}

fn main() {
    let mut reactor = Core::new().unwrap();

    let wiat = WaitInAnotherThread::new(Duration::seconds(3));
    println!("wait future started");
    let ret = reactor.run(wiat).unwrap();
    println!("wait future completed. ret == {:?}", ret);
}

Conclusion

到目前未知我们完整的实现了没有阻塞的real life future. 所以也没有浪费CPU资源。除了这个例子你还能想到与此相同的应用场景吗?
尽管RUST早都有现成的Crate帮我们实现好了。但是了解其中的工作原理还是对我们有很大的帮助。

下一个主题将是Streams,目标是:创建一个不会阻塞ReactorIterators.

深入浅出Rust Future – Part3 – The Reactor

译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 3 - The reactor

Intro

在这篇文章中我们将会讨论和阐释Reactor是如何工作的.在上篇文章中我们,我们频繁的使用Reactor来执行我们的Future,但是并没有阐述它是如何工作的。现在是时候阐明它了。

Reactor? Loop?

如果用一句话来描述Reactor,那应该是:

Reactor是一个环(Loop)

举个栗子:
你决定通过Email邀请你喜欢的女孩或者男孩(emmm, 这个栗子听起来很老套), 你怀着忐忑的心将这份邮件发送出去,心里焦急着等待着, 不停的一遍又一遍的检查你的邮箱是否有新的回复. 直到收到回复。
Rust's Reactor就是这样, 你给他一个future, 他会不断的检查,直到这个future完成(或者返回错误). Reactor通过调用程序员实现的Poll函数,来检查Future是否已完成。你所要做的就是实现future poll 并且返回Poll<T, E>结构。但是 Reactor也不会无休止的对你的future function轮询。

A future from scratch

为了让我们能更容易理解Reactor知识,我们还是从零开始实现一个Future. 换句话说就是,我们将动手实现Future Trait.

#[derive(Debug)]
struct WaitForIt {
    message: String,
    until: DateTime<Utc>,
    polls: u64,
}

我们的结构体字段也很简单:

  • message: 自定义字符串消息体
  • polls: 轮循次数
  • util: 等待时间

我们还会实现 WaitFotIt结构体的new方法.这个方法作用是初始化WaitForIt

impl WaitForIt {
    pub fn new(message: String, delay: Duration) -> WaitForIt {
        WaitForIt {
            polls: 0,
            message: message,
            until: Utc::now() + delay,
        }
    }
}

impl Future for WaitForIt {
    type Item = String;
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let now = Utc::now();
        if self.until < now {
            Ok(Async::Ready(
                format!("{} after {} polls!", self.message, self.polls),
            ))
        } else {
            self.polls += 1;

            println!("not ready yet --> {:?}", self);
            Ok(Async::NotReady)
        }
    }
}

让我们逐步解释

    type Item = String;
    type Error = Box<Error>;

上面两行在RUST里被叫做associated types, 意思就是Future在将来完成时返回的值(或者错误).

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {}

定义轮询的方法。Self::Item, Self::Error 是我们定义的associated types站位符。在我们的例子中,该方法如下:

fn poll(&mut self) - > Poll <String,Box <Error >>

现在看看我们的逻辑代码:

let now = Utc::now();
if self.until < now {
// 告诉reactor `Future` 已经完成了!
} else {
// 告诉 reactor `Future` 还没准备好,过会儿再来。
}

Rust里我们该怎样告诉Reactor某个Future已经完成了?很简单使用枚举

Ok(Async::NotReady(.......)) // 还没完成
Ok(Async::Ready(......)) // 完成了

让我们来实现上述的方法:

impl Future for WaitForIt {
    type Item = String;
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let now = Utc::now();
        if self.until < now {
            Ok(Async::Ready(
                format!("{} after {} polls!", self.message, self.polls),
            ))
        } else {
            self.polls += 1;

            println!("not ready yet --> {:?}", self);
            Ok(Async::NotReady)
        }
    }
}

为了让这段代码运行起来我们还需要:

extern crate chrono;
extern crate futures;

extern crate tokio_core;

use futures::done;
use futures::prelude::*;
use futures::future::{err, ok};
use tokio_core::reactor::Core;
use std::error::Error;
use futures::prelude::*;
use futures::*;
use chrono::prelude::*;
use chrono::*;

fn main() {
    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
    println!("wfi_1 == {:?}", wfi_1);

    let ret = reactor.run(wfi_1).unwrap();
    println!("ret == {:?}", ret);
}

运行!! 等待一秒我们将会看到结果:

Running `target/debug/tst_fut_create`
wfi_1 == WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 0 }
not ready yet --> WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 1 }

emmm~, 只运行一次就被卡住了, 但是没有额外的消耗CPU.但是为什么会这样?

如果不明确告诉ReactorReactor是不会再次轮询停放(park)给它的Future.

(- 译注: Park: 翻译成停放其实也挺好的,就像车场的停车位一样.)

在我们的例子里, Reactor会立即执行我们停放的Future方法, 当我们返回Async::NotReady, 它就会认为当前停放的Future还未完成。如果我们不主动去解除停放,Reactor永远也不会再次调用。

空闲中的Reactor是不会消耗CPU的。这样看起来Reactor效率还是很高的。
在我们的电子邮件示例中,我们可以避免手动检查邮件并等待通知。 所以我们可以在此期间自由玩Doom。(emm~看来作者很喜欢这款游戏).

另一个更有意义的示例可能是从网络接收数据。 我们可以阻止我们的线程等待网络数据包,或者我们等待时可以做其他事情。 您可能想知道为什么这种方法比使用OS线程更好?

Unparking

我们该如何纠正我们例子?我们需要以某种方式取消我们的Future。 理想情况下,我们应该有一些外部事件来取消我们的Future(例如按键或网络数据包),但是对于我们的示例,我们将使用这个简单的行手动取消停放

futures::task::current().notify();

像这样:

impl Future for WaitForIt {
    type Item = String;
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let now = Utc::now();
        if self.until < now {
            Ok(Async::Ready(
                format!("{} after {} polls!", self.message, self.polls),
            ))
        } else {
            self.polls += 1;

            println!("not ready yet --> {:?}", self);
            futures::task::current().notify();
            Ok(Async::NotReady)
        }
    }
}

现在代码完成了。 请注意,在我的情况下,该函数已被调用超过50k次, CPU占用也很高!
这是严重的浪费,也清楚地说明你为什么需要在某个合理的时间点去Unpark Future.( That's a waste of resources and clearly demonstrates why you should unpark your future only when something happened. )

另请注意循环如何仅消耗单个线程。 这是设计和效率的来源之一。 当然,如果需要,您可以使用更多线程。

Joining

Reactor可以同时运行多个Future,这也是他为什么如此有效率的原因. 那么我们该如何充分利用单线程: 当一个Future被停放的时候, 另一个可以继续工作。

对于这个例子,我们将重用我们的WaitForIt结构。 我们只是同时调用两次。 我们开始创建两个Future的实例:

let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
println!("wfi_1 == {:?}", wfi_1);
let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));
println!("wfi_2 == {:?}", wfi_2);

现在我们来调用futures::future::join_all, 他需要一个vec![]迭代器, 并且返回枚举过的Future

let v = vec![wfi_1, wfi_2];
let sel = join_all(v);

我们重新实现的代码像这样:

fn main() {
    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
    println!("wfi_1 == {:?}", wfi_1);
    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));
    println!("wfi_2 == {:?}", wfi_2);

    let v = vec![wfi_1, wfi_2];

    let sel = join_all(v);

    let ret = reactor.run(sel).unwrap();
    println!("ret == {:?}", ret);
}

这里的关键点是两个请求是交错的:第一个Future被调用,然后是第二个,然后是第一个,依此类推,直到两个完成。 如上图所示,第一个Future在第二个之前完成。 第二个在完成之前被调用两次。

Select

Future的特性还有很多功能。 这里值得探讨的另一件事是select函数。 select函数运行两个(或者在select_all的情况下更多)Future,并返回第一个完成。 这对于实现超时很有用。 我们的例子可以简单:

fn main() {
    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
    println!("wfi_1 == {:?}", wfi_1);
    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(2));
    println!("wfi_2 == {:?}", wfi_2);

    let v = vec![wfi_1, wfi_2];

    let sel = select_all(v);

    let ret = reactor.run(sel).unwrap();
    println!("ret == {:?}", ret);
}

Closing remarks

下篇将会创建一个更RealFuture.

可运行的代码

extern crate chrono;
extern crate futures;

extern crate tokio_core;

use futures::done;
use futures::prelude::*;
use futures::future::{err, ok};
use tokio_core::reactor::Core;
use std::error::Error;
use futures::prelude::*;
use futures::*;
use chrono::prelude::*;
use chrono::*;
use futures::future::join_all;
#[derive(Debug)]
struct WaitForIt {
    message: String,
    until: DateTime<Utc>,
    polls: u64,
}

impl WaitForIt {
    pub fn new(message: String, delay: Duration) -> WaitForIt {
        WaitForIt {
            polls: 0,
            message: message,
            until: Utc::now() + delay,
        }
    }
}

impl Future for WaitForIt {
    type Item = String;
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let now = Utc::now();
        if self.until < now {
            Ok(Async::Ready(
                format!("{} after {} polls!", self.message, self.polls),
            ))
        } else {
            self.polls += 1;

            println!("not ready yet --> {:?}", self);
            futures::task::current().notify();
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
    println!("wfi_1 == {:?}", wfi_1);
    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));
    println!("wfi_2 == {:?}", wfi_2);

    let v = vec![wfi_1, wfi_2];

    let sel = join_all(v);

    let ret = reactor.run(sel).unwrap();
    println!("ret == {:?}", ret);
}

深入浅出Rust Future – Part 2

译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 2

Intro

在这个系列的第一篇文章我们了解了如何使用Rust Future.但是只有我们彻底的了解Future并且操作得当才能发挥它真正的作用。这个系列的第二篇文章,我们将介绍如何避免Future里常见的陷阱。

Error troubles

我们将Future组织成一个很简单,只要通过Rust Future提供的and_then函数就可以了。但是在上一篇文章中我们使用了Box<Error> trait作为错误类型,绕过了编译器的检查。为什么我们没有使用更为详细的错误类型?原因很简单, 每个Future函数的错误返回都有可能不同.

原则1: 当我们将不同的Future组织成一个调用时,每个Future都应该返回相同的Error Type.

让我们一起来证明一下这一点.

我们有两个被叫做ErrorAErrorBError类型, 我们将会实现error::Error trait,尽管这并不是编译器必须让我们做的(但是这是一个好习惯[在我看来这应该算是一个最佳实践]),在我们实现error::Error trait的同时还需要实现std::fmt::Display,现在就让我们一起实现他吧!

#[derive(Debug, Default)]
pub struct ErrorA {}

impl fmt::Display for ErrorA {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "ErrorA!")
    }
}

impl error::Error for ErrorA {
    fn description(&self) -> &str {
        "Description for ErrorA"
    }

    fn cause(&self) -> Option<&error::Error> {
        None
    }
}

// Error B
#[derive(Debug, Default)]
pub struct ErrorB {}

impl fmt::Display for ErrorB {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "ErrorB!")
    }
}

impl error::Error for ErrorB {
    fn description(&self) -> &str {
        "Description for ErrorB"
    }

    fn cause(&self) -> Option<&error::Error> {
        None
    }
}

我尽量用简单的方式去实现Error Trait,这样可以排除别的干扰来证明我的观点. 现在让我们在Future中使用ErrorAErrorB.

fn fut_error_a() -> impl Future<Item = (), Error = ErrorA> {
    err(ErrorA {})
}

fn fut_error_b() -> impl Future<Item = (), Error = ErrorB> {
    err(ErrorB {})
}

现在让我们在main函数里调用它.

let retval = reactor.run(fut_error_a()).unwrap_err();
println!("fut_error_a == {:?}", retval);

let retval = reactor.run(fut_error_b()).unwrap_err();
println!("fut_error_b == {:?}", retval);

跟我们所预见的结果一致:

fut_error_a == ErrorA
fut_error_b == ErrorB

到现在为止还挺好的,让我们把ErrorAErrorB打包成一个调用链:

let future = fut_error_a().and_then(|_| fut_error_b());

我们先调用fut_error_a然后再调用fut_error_b,我们不用关心fut_error_a的返回值所以我们用_省略不用. 用更复杂的术语解释就是: 我们将impl Future<Item=(), Error=ErrorA>impl Future<Item=(), Error=ErrorB>打包成调用链.

现在让我们尝试编译这段代码:


Compiling tst_fut2 v0.1.0 (file:///home/MINDFLAVOR/mindflavor/src/rust/tst_future_2)
error[E0271]: type mismatch resolving `<impl futures::Future as futures::IntoFuture>::Error == errors::ErrorA`
   --> src/main.rs:166:32
    |
166 |     let future = fut_error_a().and_then(|_| fut_error_b());
    |                                ^^^^^^^^ expected struct `errors::ErrorB`, found struct `errors::ErrorA`
    |
    = note: expected type `errors::ErrorB`
               found type `errors::ErrorA`

这个报错非常明显, 编译器期待我们使用ErrorB但是我们给了一个ErrorA

原则2: 当我们组织Future Chain时,第一个错误类型必须与最后一个future返回的错误类型一致.(When chaining futures, the first function error type must be the same as the chained one.)

rustc已经非常明确的告诉我们了. 这个Future chain最终返回的是ErrorB所以我们第一个函数也应该返回ErrorB. 在上述代码我们返回了ErrorA, 所以导致编译失败.

我们改如何处理这个问题?非常幸运的是, 我们可以使用Rust Future给我们提供的map_err方法. 在我们的示例中,我们想要把ErrorA转换成ErrorB,所以我们只需要在ErrorAErrorB之间调用这个函数就行了.

let future = fut_error_a()
    .map_err(|e| {
        println!("mapping {:?} into ErrorB", e);
        ErrorB::default()
    })
    .and_then(|_| fut_error_b());

let retval = reactor.run(future).unwrap_err();
println!("error chain == {:?}", retval);

如果我们现在编译并运行示例,将会输出:

mapping ErrorA into ErrorB
error chain == ErrorB

让我们进一步推动这个例子.假设我们想连接ErrorA,然后是ErrorB,然后再连接ErrorA。 就像是:

let future = fut_error_a()
    .and_then(|_| fut_error_b())
    .and_then(|_| fut_error_a());

我们最初的解决方式只适合成对的future, 并没有考虑其他的情况。所以在上面代码中我们不得不这么做:
ErrorA => ErrorB => ErrorA.就像这样:

let future = fut_error_a()
    .map_err(|_| ErrorB::default())
    .and_then(|_| fut_error_b())
    .map_err(|_| ErrorA::default())
    .and_then(|_| fut_error_a());

看上去不那么优雅但是还是解决了多个Future的错误处理.

"From" to the rescue

简化上述代码的一种简单的方式就是利用std::covert::From. 当我们实现From, 这样编译器就可以自动的将一个结构软换为另一个结构.现在让我们实现From<ErrorA> for ErrorBFrom<ErrorB> for ErrorA.

impl From<ErrorB> for ErrorA {
    fn from(e: ErrorB) -> ErrorA {
        ErrorA::default()
    }
}

impl From<ErrorA> for ErrorB {
    fn from(e: ErrorA) -> ErrorB {
        ErrorB::default()
    }
}

通过上述的实现我们只需要用from_err函数来代替map_err就好了。

let future = fut_error_a()
   .from_err()
   .and_then(|_| fut_error_b())
   .from_err()
   .and_then(|_| fut_error_a());

现在的代码仍然与错误转换混合, 但转换代码不再是内联的,而且代码可读性也提高了。Futrue Crate非常聪明:只有在错误的情况下才会调用from_err代码, 因此在不使用from_err时, 也不会在Runtime时产生额外的开销.

Lifetimes

Rust签名功能是引用的显式生命周期注释. 但是,大多数情况下,Rust允许我们避免使用生命周期省略来指定生命周期.让我们看看它的实际效果. 我们想编写一个带字符串引用的函数,如果成功则返回相同的字符串引用:

fn my_fn_ref<'a>(s: &'a str) -> Result<&'a str, Box<Error>> {
    Ok(s)
}

注意代码中 <'a> 的部分, 意思是我们显示的声明一个生命周期. 接着我们声明了一个引用形参s: &'a str, 这个参数必须在'a生命周期有效的情况下使用.使用Result <&'str,Box <Error >>,我们告诉Rust我们的返回值将包含一个字符串引用.只要'a有效,该字符串引用必须有效.换句话说,传递的字符串引用和返回的对象必须具有相同的生命周期.这会导致我们的语法非常冗长,以至于Rust允许我们避免在常见情况下指定生命周期。 所以我们可以这样重写函数:

fn my_fn_ref(s: &str) -> Result<&str, Box<Error>> {
    Ok(s)
}

但是在Future中你不能这样写, 让我们来尝试用Future方式复写这个函数:

fn my_fut_ref_implicit(s: &str) -> impl Future<Item = &str, Error = Box<Error>> {
    ok(s)
}

编译将会失败(rustc 1.23.0-nightly (2be4cc040 2017-11-01):

Compiling tst_fut2 v0.1.0 (file:///home/MINDFLAVOR/mindflavor/src/rust/tst_future_2)
error: internal compiler error: /checkout/src/librustc_typeck/check/mod.rs:633: escaping regions in predicate Obligation(predicate=Binder(ProjectionPredicate(ProjectionTy { substs: Slice([_]), item_def_id: DefId { krate: CrateNum(15), index: DefIndex(0:330) => futures[59aa]::future[0]::Future[0]::Item[0] } }, &str)),depth=0)
  --> src/main.rs:39:36
   |
39 | fn my_fut_ref_implicit(s: &str) -> impl Future<Item = &str, Error = Box<Error>> {
   |                                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

note: the compiler unexpectedly panicked. this is a bug.

note: we would appreciate a bug report: https://github.com/rust-lang/rust/blob/master/CONTRIBUTING.md#bug-reports

note: rustc 1.23.0-nightly (2be4cc040 2017-11-01) running on x86_64-unknown-linux-gnu

thread 'rustc' panicked at 'Box<Any>', /checkout/src/librustc_errors/lib.rs:450:8
note: Run with `RUST_BACKTRACE=1` for a backtrace.

当然也有解决方式,我们只要显示声明一个有效的生命周期就行了:

fn my_fut_ref<'a>(s: &'a str) -> impl Future<Item = &'a str, Error = Box<Error>> {
    ok(s)
}

impl Future with lifetimes

Future中如果有引用传参我们必须要显示的注释生命周期. 举个例子, 我们希望使用&s的值并且返回的是一个没有引用的String.我们必须显示的注释生命周期:

fn my_fut_ref_chained<'a>(s: &'a str) -> impl Future<Item = String, Error = Box<Error>> {
    my_fut_ref(s).and_then(|s| ok(format!("received == {}", s)))
}

上面的代码将会报错:

error[E0564]: only named lifetimes are allowed in `impl Trait`, but `` was found in the type `futures::AndThen<impl futures::Future, futures::FutureResult<std::string::String, std::boxed::Box<std::error::Error + 'static>>, [closure@src/main.rs:44:28: 44:64]>`
  --> src/main.rs:43:42
   |
43 | fn my_fut_ref_chained<'a>(s: &'a str) -> impl Future<Item = String, Error = Box<Error>> {
   |                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

为了解决这个错误我们必须为impl Future追加一个'a生命周期:

fn my_fut_ref_chained<'a>(s: &'a str) -> impl Future<Item = String, Error = Box<Error>> + 'a {
    my_fut_ref(s).and_then(|s| ok(format!("received == {}", s)))
}

现在你可以运行这段代码了:

let retval = reactor
    .run(my_fut_ref_chained("str with lifetime"))
    .unwrap();
println!("my_fut_ref_chained == {}", retval);

Closing remarks

在下一篇文章中,我们将介绍Reactor。 我们还将从头开始编写未来的实现结构。

深入浅出Rust Future – Part 1

译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 1

Intro

如果你是一个程序员并且也喜欢Rust这门语言, 那么你应该经常在社区听到讨论Future 这个库的声音, 一些很优秀的Rust Crates都使用了Future 所以我们也应该对它有足够的了解并且使用它. 但是大多数程序员很难理解Future到底是怎么工作的, 当然有官方 Crichton's tutorial这样的教程, 虽然很完善, 但我还是很难理解并把它付诸实践.

我猜测我并不是唯一一个遇到这样问题的程序员, 所以我将分享我自己的最佳实践, 希望这能帮助你理解这个话题.

Futures in a nutshell

Future 是一个不会立即执行的特殊functions. 他会在将来执行(这也是他被命名为future的原因).我们有很多理由让future functions来替代std functions,例如: 优雅性能可组合性.future的缺点也很明显: 很难用代码去实现. 当你不知道何时会执行某个函数时, 你又怎么能理解他们之间的因果关系呢?

处于这个原因, Rust会试图帮助我们这些菜鸟程序员去理解和使用future这个特性。

Rust's futures

Rust 的futures 总是一个Results: 这意味着你必须同时指定期待的返回值和备用的错误类型。
让我们先简单的实现一个方法,然后把它改造成future. 我们设计的这个方法返回值是 u32 或者是一个 被Box包围着的Error trait, 代码如下所示:

fn my_fn() -> Result<u32, Box<Error>> {
    Ok(100)
}

这段代码很简单,看起来并没有涉及到future. 接下来让我们看看下面的代码:

fn my_fut() -> impl Future<Item = u32, Error = Box<Error>> {
    ok(100)
}

注意这两段代码不同的地方:

  1. 返回的类型不再是Result而是一个impl Future. Rust Nightly版本是允许我们返回一个future的。
  2. 第二个函数返回值的参量Item = u32, Error = Box<Error>较第一个函数来看更加详细明确。

为了能让第二段代码工作 你需要使用拥有conservative_impl_trait特性的nightly版本。当然,如果不嫌麻烦,你可以使用boxed trait来替代。

另请注意第一个函数返回值使用的是大写的Ok(100)。 在Result函数中,我们使用大写的Ok枚举,而future我们使用小写的ok方法.

规则: 在Rustfuture中使用小写返回方法ok(100).

好了现在我们改造完毕了,但是我们该怎样执行第二个我们改造好的方法?标准方法我们可以直接调用,但是这里需要注意的是地一个方法返回值是一个Result, 所以我们需要使用unwrap()来获取我们期待的值。

let retval = my_fn().unwrap();
println!("{:?}", retval);

由于future在实际执行之前返回(或者更准确的说, 返回的是我们将来要执行的代码), 我们需要一种途径去执行future。为此我们使用Reactor。我们只需要创建一个Reactor并且调用他的run方法就可以执行future. 就像下面的代码:

let mut reactor = Core::new().unwrap();

let retval = reactor.run(my_fut()).unwrap();
println!("{:?}", retval);

注意这里我们unwrap的是run方法,而不是my_fut.
看起来真的很简单。

Chaining

future一个很重要的特性就是能够把其他的future组织起来形成一个chain. 举个栗子:

你邀请你的父母一起吃晚饭通过email.
你在电脑前等待他们的回复
父母同意与你一起吃晚饭(或者因为一些原因拒绝了)。

Chaining就是这样的,让我们看一个简单的例子:

fn my_fn_squared(i: u32) -> Result<u32, Box<Error>> {
    Ok(i * i)
}

fn my_fut_squared(i: u32) -> impl Future<Item = u32, Error = Box<Error>> {
    ok(i * i)
}

现在我们可以使用下面的方式去调用这两个函数:

let retval = my_fn().unwrap();
println!("{:?}", retval);

let retval2 = my_fn_squared(retval).unwrap();
println!("{:?}", retval2);

当然我们也可以模拟Reactor来执行相同的代码:

let mut reactor = Core::new().unwrap();

let retval = reactor.run(my_fut()).unwrap();
println!("{:?}", retval);

let retval2 = reactor.run(my_fut_squared(retval)).unwrap();
println!("{:?}", retval2);

但还有更好的方法,在Rust中future也是一个trait他有很多种方法(这里我们会介绍些),其中一个名为and_then的方法,在语义上完全符合我们最后写的代码片段。但是没有显式的执行Reactor Run, 让我们一起来看看下面的代码:

let chained_future = my_fut().and_then(|retval| my_fut_squared(retval));
let retval2 = reactor.run(chained_future).unwrap();
println!("{:?}", retval2);

让我们看看第一行:创建一个被叫做chained_futurefuture, 它把my_futmu_fut_squaredfuture串联了起来。
这里让人难以理解的部分是: 我们如何将上一个future的结果传递给下一个future?

在Rust中我们可以通过闭包来捕获外部变量来传递future的值。

可以这样想:

  1. 调度并且执行my_fut()
  2. my_fut()执行完毕后,创建一个retval变量并且将my_fut()的返回值存到其中。
  3. 现在将retval作为my_fn_squared(i: u32)的参数传递进去,并且调度执行my_fn_squared
  4. 把上面一些列的操作打包成一个名为chained_future的调用链。

第二行代码,与之前的相同: 我们调用Reactor run(), 要求执行chained_future并给出结果。

当然我们可以通过这种方式将无数个future打包成一个chain, 不要去担心性能问题, 因为future chainzero cost.

RUST borrow checked可能让你的future chain 写起来不是那么的轻松,所以你可以尝试move你的参数变量.

Mixing futures and plain functions

你也可以使用普通的函数来做future chain, 这很有用, 因为不是每个功能都需要使用future. 此外, 你也有可能希望调用外部你无法控制的函数。
如果函数没有返回Result,你只需在闭包中添加函数调用即可。 例如,如果我们有这个普通函数:

fn fn_plain(i: u32) -> u32 {
    i - 50
}

let chained_future = my_fut().and_then(|retval| {
    let retval2 = fn_plain(retval);
    my_fut_squared(retval2)
});

let retval3 = reactor.run(chained_future).unwrap();
println!("{:?}", retval3);

如果你的函数返回Result则有更好的办法。我们一起来尝试将my_fn_squared(i: u32) -> Result<u32, Box<Error>方法打包进future chain

在这里由于返回值是Result所以你无法调用and_then, 但是future有一个方法done()可以将Result转换为impl Future.这意味着我们可以将普通的函数通过done方法把它包装成一个future.

let chained_future = my_fut().and_then(|retval| {
    done(my_fn_squared(retval)).and_then(|retval2| my_fut_squared(retval2))
});
let retval3 = reactor.run(chained_future).unwrap();
println!("{:?}", retval3);

注意第二:done(my_fn_squared(retval))允许我们在链式调用的原因是:我们将普通函数通过done方法转换成一个impl Future. 现在我们不使用done方法试试:

let chained_future = my_fut().and_then(|retval| {
    my_fn_squared(retval).and_then(|retval2| my_fut_squared(retval2))
});
let retval3 = reactor.run(chained_future).unwrap();
println!("{:?}", retval3);

编译不通过!

Compiling tst_fut2 v0.1.0 (file:///home/MINDFLAVOR/mindflavor/src/rust/tst_future_2)
error[E0308]: mismatched types
   --> src/main.rs:136:50
    |
136 |         my_fn_squared(retval).and_then(|retval2| my_fut_squared(retval2))
    |                                                  ^^^^^^^^^^^^^^^^^^^^^^^ expected enum `std::result::Result`, found anonymized type
    |
    = note: expected type `std::result::Result<_, std::boxed::Box<std::error::Error>>`
               found type `impl futures::Future`

error: aborting due to previous error

error: Could not compile `tst_fut2`.

expected type std::result::Result<_, std::boxed::Box<std::error::Error>> found type impl futures::Future,这个错误有点让人困惑. 我们将会在第二部分讨论它。

Generics

最后但并非最不重要的, futuregeneric(这是啥玩意儿啊)一起工作不需要任何黑魔法.

fn fut_generic_own<A>(a1: A, a2: A) -> impl Future<Item = A, Error = Box<Error>>
where
    A: std::cmp::PartialOrd,
{
    if a1 < a2 {
        ok(a1)
    } else {
        ok(a2)
    }
}

这个函数返回的是 a1 与 a2之间的较小的值。但是即便我们很确定这个函数没有错误也需要给出Error,此外,返回值在这种情况下是小写的ok(原因是函数, 而不是enmu)

现在我们调用这个future:

let future = fut_generic_own("Sampdoria", "Juventus");
let retval = reactor.run(future).unwrap();
println!("fut_generic_own == {}", retval);

阅读到现在你可能对future应该有所了解了, 在这边文章里你可能注意到我没有使用&, 并且仅使用函数自身的值。这是因为使用impl Future,生命周期的行为并不相同,我将在下一篇文章中解释如何使用它们。在下一篇文章中我们也会讨论如何在future chain处理错误和使用await!()宏。

创建一个简单的k8s应用

创建K8S应用

前言

从创建Docker Container开始一步一步给大家讲述如何创建自己的K8S应用.看懂这篇操作手册你可能需要
了解:

  1. Docker 命令的使用
  2. kubectl 命令的使用
  3. Yaml的使用

    概念

Docker
  • Docker Registry是什么?
    • 他是存储docker镜像的仓库, 如果你熟悉git,那么上手这个也很快, 你需要了解到的几个命令:
    • docker push (上传镜像)
    • docker pull (拉取镜像)

常用的仓储有 https://index.docker.io

  • Docker Container 与 Docker Image 之间的关系?

    • 所有的Docker Container 都是以 Docker Image为蓝本创建的
    • Docker Image 可以理解为一个还没装到你电脑上的Win7系统镜像,Docker Container是已经装到你的电脑上的系统.
  • 如何基于Dockerfile创建一个Docker Container?

    # Docker Hub 拉取ubuntu 基础镜像,并以它为蓝本创建自己的镜像
    FROM ubuntu
    # 维护者
    MAINTAINER docker_user docker_user@email.com
    # 更新ubuntu系统
    RUN echo "deb http://archive.ubuntu.com/ubuntu/ raring main universe" >> /etc/apt/sources.list
    # 安装Nginx
    RUN apt-get update && apt-get install -y nginx
    # 为了保证容器与nginx生命周期一致,所有的程序不建议用background的方式运行
    RUN echo "\ndaemon off;" >> /etc/nginx/nginx.conf
    # Docker Container 运行时需要执行的命令
    CMD /usr/sbin/nginx
K8S
  • POD 是什么?

    • 是K8S里最小可运行的单元,一个POD里至少需要一个Docker Container。
  • Service 是什么?

    Container需要对外服务,以Nginx为例子,来描述Service创建过程

    1. Container 暴露一个端口映射到POD端口上(K8S会分给POD一个内部的Cluster IP)
    2. POD 暴露出相应的端口。
    3. Service 通过K8S的Label标签功能,发现POD,以及POD暴露出来的端口
  • Ingress 是什么

    • Ingress是一个互联网入口,可以看做一个简单的Nginx,Ingress通过KUBE-PROXY将外部访问流量引导至Service上

创建流程

main.go

package main

import (
    "net/http"
)

func SayHello(w http.ResponseWriter, req *http.Request) {
    w.Write([]byte("Hello World"))
}

func main() {
    http.HandleFunc("/hello", SayHello)
    http.ListenAndServe(":8001", nil)

}
创建基准Docker Image
# 以BusyBox做为基准镜像创建我们自己的Docker Image
FROM busybox
# 设置工作目录
WORKDIR /go/src/app
# 把当前目录的二进制文件放到Docker Images
COPY . .
# 纠正时间
RUN  cp -r -f Shanghai /etc/localtime && echo 'Asia/Shanghai' >/etc/timezone 
# 运行时执行的命令
CMD ["/go/src/app/helloworld"]
创建POD

deployment.yaml

# 使用k8s哪一个版本的API
apiVersion: extensions/v1beta1 
# 以Deployment方式创建POD, 这里有很多类型以后有机会再讲
kind: Deployment
metadata:
  # 创建的POD名字K8S内部会hash这个值
  name: helloworld
  # 指定在哪个namespace下创建
  namespace: frm
spec:
  # 部署副本数量
  replicas: 3
  # 保留历史版本的副本数量的上限值
  revisionHistoryLimit: 5
  template:
    metadata:
      labels:
        # Service 通过这个值讲 SVC与POD关联上
        app: helloworld
        version: production
    spec:
      containers:
      # 创建的ContainerName
      - name: helloworld
        # docker image 地址
        image: helloworld:v1.3
        # 触发滚动更新的规则
        imagePullPolicy: IfNotPresent
        # 资源限制
        resources:
          limits:
            cpu: 80m
            memory: 80Mi
          requests:
            cpu: 20m
            memory: 20Mi 
        # container 暴露出来的端口
        ports:
        - name: helloworld-port
          containerPort: 80
        # 挂载目录  
        volumeMounts:
        - mountPath: /go/src/app/logs
          name: log  
     # 指定该POD卷挂载到宿主机目录
      volumes:
      - name: log
        hostPath:
          path: /home/data/logs/helloworld # 宿主及目录
          type: Directory
创建Service

service.yaml

apiVersion: v1
kind: Service
metadata:
  name: helloworld-srv
  namespace: frm
  labels:
    # Service 的Label, Ingress通过这个来关联Service
    app: helloworld-srv
    version: production
spec:
  type: ClusterIP
  selector:
    # 选择一个POD Label
    app: helloworld
    version: production
  # 选择POD暴露的端口  
  ports:
    - name: http
      port: 80
创建Ingress

ingress.yaml

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: helloworld-ingress
  namespace: frm
  labels:
    app: helloworld-ingress
spec:
  rules:
  ### 指定域名
  - host: helloworld.test
    http:
      paths:
      - path: /
        backend:
          # 选择绑定哪个service
          serviceName: helloworld-srv
          servicePort: 80
创建命令

kubectl create -f deployment.yaml service.yaml ingress.yaml

流程

User Request -> Ingress Port-> Service -> Pod -> Container

问题排查

  1. 构建Docker Image后先自己 docker run 一下来确认构建是否是成功的!

  2. 创建失败 ,查看POD创建状态

  • kubectl describe pods POD_NAME -n NAMESPACE
  1. 创建失败 ,查看POD LOG

    • kubectl log POD_NAME -n NAMESPACE
  2. POD启动成功但是无法访问

    • kubectl get svc,ingresss,pod -n NAMESPACE
  3. 查看service的状态

    • kubectl exec -it POD_NAME -n NAMESPACE -c CONTAINER_NAME,进入有问题的容器看看

从零创建一个 Kubernetes Web 应用

前言

上一篇文章简单的介绍了Kubernetes内部的负载均衡原理,有朋友在群里反映不要一上来就将原理,想了想也是,那我就从如何创建一个PHP Web应用入手,带大家进入Kubernetes的世界。


## 基础
### 环境
- CentOS 7.5 (Kernel 3.10)
- Minikube (Kubernetes 1.10.0)
### 对你的要求
我假设你已经掌握了下面的基础技能:
- Docker && 会写Dockerfile
- 如何Google
- 拥有一个DockerHub账号
- 手动编译过LNMP或者LAMP
# 构建基础镜像
![alt](http://testvplpic170904.ufile.ucloud.com.cn/Container.png)
上图描述了我们需要创建的Containers,其中Pause ContainerKubernetes自带的所以我们不用关心,但是十分重要,未来将会有一篇文章来描述Pause Container到底干什么的。
其实基础镜像一般用官方现成的就行了,但是在学习过程中建议还是手动编译一下,了解下官方默认配置有哪些坑。Dockerfile代码我会放到GitHub上, 因为在这里展示实在是太长了。
## 创建Nginx镜像
Nginx: [Nginx For K8S GitHub Repo](https://github.com/motecshine/nginx1.12-for-k8s)
### 编译Nginx镜像
```Shell
docker build . -t motecshine/nginx1.12-for-k8s:v0.1.0
docker push motecshine/nginx1.12-for-k8s:v0.1.0
```
## 创建PHP-FPM镜像
FPM: [FPM For K8S GitHub Repo](https://github.com/motecshine/php71-for-k8s)
### 编译FPM镜像
```Shell
docker build . -t motecshine/php71-for-k8s:v0.1.0
docker push motecshine/php71-for-k8s:v0.1.0
```
> 注意事项: Dockerfile CMD 需要关闭NginxFPMdaemon特性,具体看我REPO的Dockerfile, 这样是为了保证Container生命周期与POD生命周期一致。
# 构建业务镜像
我们将基于上述镜像来创建我们的业务镜像.
## 创建Code镜像
我们基于Laravel来创建镜像。
Code: [Code For K8S GitHub Repo](https://github.com/motecshine/code-for-k8s)
### 编译Code镜像
```Shell
docker build . -t motecshine/code-for-k8s:v0.1.1
docker push motecshine/code-for-k8s:v0.1.1
```
## 创建Nginx镜像
laravel-nginx-for-k8s: [Laravel For K8S GitHub Repo](https://github.com/motecshine/laravel-nginx-for-k8s)
### 编译Nginx镜像
```Shell
docker build . -t motecshine/laravel-nginx-for-k8s:v0.1.1
docker push motecshine/laravel-nginx-for-k8s:v0.1.1
```
## 创建PHP-FPM镜像
laravel-fpm-for-k8s: [Laravel-FPM For K8S GitHub Repo](https://github.com/motecshine/laravel-fpm-for-k8s)
### 编译FPM镜像
```Shell
docker build . -t motecshine/laravel-fpm-for-k8s:v0.1.0
docker push motecshine/laravel-fpm-for-k8s:v0.1.0
```
# 构建Kubernetes应用
![整体架构](http://testvplpic170904.ufile.ucloud.com.cn/k8s.png)
整体架构如上图所示
## 构建最小化运行单元(Pod)
![alt](http://testvplpic170904.ufile.ucloud.com.cn/deployment.png)
### 创建Deployment
```yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: laravel
namespace: default
spec:
replicas: 1 # 期待副本数量
template:
metadata:
labels:
app: laravel # app label
version: testing
spec:
containers:
- name: code
image: motecshine/code-for-k8s:v0.1.1
volumeMounts: # 挂载目录
- mountPath: /data2
name: code
- name: fpm
image: motecshine/laravel-fpm-for-k8s:v0.1.0
imagePullPolicy: IfNotPresent
resources: # 资源限制
limits:
cpu: 350m
memory: 350Mi
requests:
cpu: 50m
memory: 50Mi
ports:
- name: fpm
containerPort: 9000
volumeMounts:
- mountPath: /data/code # 挂载code
name: code
- mountPath: /var/log # 挂载日志
name: log
- name: laravel-nginx
image: motecshine/laravel-nginx-for-k8s:v0.1.0
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 350m
memory: 350Mi
requests:
cpu: 50m
memory: 50Mi
ports:
- name: laravel-nginx
containerPort: 80 # 暴露Endpoint
volumeMounts:
- mountPath: /data/code
name: code
- mountPath: /var/log
name: log
volumes:
- name: code
emptyDir: {}
- name: log
hostPath:
path: /var/log
type: Directory
```
## 构建Service
```yaml
apiVersion: v1
kind: Service
metadata:
name: laravel-service
namespace: default
labels:
app: laravel-service
version: testing-service
spec:
type: ClusterIP
selector:
app: laravel
version: testing
ports:
- name: http
port: 80
```
## 构建Ingress
```yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: laravel-ingress
namespace: default
labels:
app: laravel-ingress
spec:
rules:
- host: laravel.test
http:
paths:
- path: /
backend:
serviceName: laravel-service
servicePort: 80
```
## 安装Minikube
```Shell
curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube && sudo cp minikube /usr/local/bin/ && rm minikube
```
## 安装Traefik
我们使用开源的Ingress组件安装[参考这里](https://docs.traefik.io/user-guide/kubernetes/)
## 启动Web应用
[上面的配置文件在这里](https://github.com/motecshine/laravel-k8s-config.git)
```Shell
git clone git@github.com:motecshine/laravel-k8s-config.git
cd laravel-k8s-config && kubectl create -f .
```
### 效果
![效果](http://testvplpic170904.ufile.ucloud.com.cn/effect.png)
# 结语
简单的介绍了如何创建一个Web应用,这仅仅是个开始,Kubernetes背后是一个庞大的生态环境, CI,CD,ELK(EFK), APM,让我们一点点揭开它神秘的面纱。
这里挂载日志到Host Path 会有并发写入的问题, 下一篇将Kubenetes基于EFK日志收集平台,并且给出这个问题的解决方案。

PHP与PHP-FPM不得不说的二三事

前言

说起PHP,大家肯定对PHP-FPM也不陌生,因为如果做Web的话经常与它打交道,公司新建blog,我来抛砖引玉,文章有错误的地方欢迎大家指正。
可能是一个系列将从 php-fpm讲起,会针对 ZendVM 词法编译,语法编译,粗暴的zend_mm和gc,zend_vm重要的数据结构(zend_array(php5 是hashtable), zval)

SAPI是什么?

SAPI 是PHP框架的接口层,他是进入PHP内部的入口,其中我们使用频率比较多的几个: CLI, PHP-FPM, ApacheHandler2(php 5.6以前经常使用, php-ng 以后从官方的标准SAPI库中移除) ,都实现了SAPI Interface。

PHP-FPM是什么?

说起FPM(FastCGI Process Manager),不得不先说说FastCGI。
FastCGI 是Web程序和处理程序之间的一种通信协议,他是与HTTP类似的一种应用层通信的协议。注意:FastCGI 是一种协议
PHP本身不像GO 那样实现HTTP协议(第三方库 Swoole 有实现),而是实现了 FastCGI协议。
FPM 就是解析和管理FastCGI Pool

PHP-FPM 工作原理

fpm.c
/*  children: return listening socket
    parent: never return */
int fpm_run(int *max_requests) /* {{{ */
{
    struct fpm_worker_pool_s *wp;

    /* create initial children in all pools */
    for (wp = fpm_worker_all_pools; wp; wp = wp->next) {
        int is_parent;

        is_parent = fpm_children_create_initial(wp);

        if (!is_parent) {
            goto run_child;
        }

        /* handle error */
        if (is_parent == 2) {
            fpm_pctl(FPM_PCTL_STATE_TERMINATING, FPM_PCTL_ACTION_SET);
            fpm_event_loop(1);
        }
    }

    /* run event loop forever */
    fpm_event_loop(0);

run_child: /* only workers reach this point */

    fpm_cleanups_run(FPM_CLEANUP_CHILD);

    *max_requests = fpm_globals.max_requests;
    return fpm_globals.listening_socket;
}
/* }}} */

三种进程管理方式

  • 静态模式:
    在启动的时候 master根据pm.max_children配置fork出相应数量地worker进程,woker的数量是固定的。

  • 动态模式(dynamic):
    这种模式应该是最常用的, fpm 启动时会根据pm.start_servers配置初始化一定数量的worker。 如果master发现空闲woker低于pm.min_spare_servers配置数则会fork出更多的woker进程,但是不会超过pm.max_spare_servers, 如果master发现了空闲的woker 大于 pm.max_spare_servers 则会杀死部分woker。

  • 按需(ondemand):
    启动时,不分配woker,请求来了 master 才会fork,但是不会超过pm.max_children。请求流程完了也不会立马kill woker,当woker的空闲时间超过pm.oricess_idle_timeout才会被杀死

工作流程

FPM 是一个多进程模型,他由一个Master进程和多个Worker进程组成。Master在初始化时会建立一个socket,但是不会接受和处理请求,而是由fork出来的子进程完成这些工作。
Master进程和Woker进程之间不会直接通信,而是通过共享内存Master知道Woker进程的信息

master 进程

从代码中可以看到 master在调用fpm_run()后不再返回,而是进入fpm_event_loop(),这个方法会循环处理注册的几个I/O事件。

multi worker 进程

woker的工作就是 争抢处理请求,争抢成功后,解析FastCGI协议,获得服务器真实的php脚本地址,然后编译执行脚本,但是woker进程是阻塞的,这样是为了简单粗暴的解决进程资源安全问题。
下面是woker执行的几个阶段:

  • 等待请求:woker进程阻塞在 fcgi_accept_request()中等待请求。

  • 解析请求: fastcgi请求到达后被,woker开始接受,然后解析请求,request数据陆续到达,直到request完整开始执行编译。

  • ZEND_VM初始化: 执行php_request_startup(), 此阶段会调用每个扩展的PHP_RINT_FUNCTTION(),并且激活GC模块等等。

  • 执行脚本: 由php_execute_script() 激活zend_vm ,编译执行j脚本。(不清楚 可以gdb attache fpm 一下 然后打个断点)

  • 收尾: 由php_request_shutdown()完成,并且调用每个扩展的PHP_RSHUTDOWN_FUNCTION()。

PHP-FPM 配置与优化

  • 不讲apache与php-fpm 我用的不多,没有深入了解过。

几个关键的配置

fpm一般有两个配置文件,fpm.conf 是针对所有的,www.conf 是针对fpm master-woker pool的

  • fpm.conf

    • error_log = log/php-fpm.log 错误日志:超时,woker不够用,php 脚本crash都会记录在这里
    • process.max = 128 控制全局的woker数量
  • www.conf

    • listen = 127.0.0.1:9000 fpm FastCGI 端口,与常用的http服务做通讯(nginx, apache, iis)

    • listen.allowed_clients = 127.0.0.1 允许访问的客户端,如果这个是127.0.0.1则 nginx或者apache 需要与php-fpm在一台主机上

    • 下面的上文讲过

    • pm = dynamic

    • pm.max_children = 5

    • pm.start_servers = 2

    • pm.min_spare_servers = 1

    • pm.max_spare_servers = 3

    • pm.process_idle_timeout = 10s;

k8s内部负载均衡原理

前言

个人理解有限,如有错误,请及时指正。

前前后后学习kubernetes已经有三个月了,一直想写一遍关于kubernetes内部实现的一系列文章来作为这三个月的总结,个人觉得kubernetes背后的架构理念以及技术会成为中大型公司架构的未来。我推荐可以先阅读下Google的Large-scale cluster management at Google with Borg技术文献,它是实现kubernetes的基石。

准备

在阐述原理之前我们需要先了解下kubernetes关于内部负载均衡的几个基础概念以及组件。

概念

Pod

alt

1.PodKubernetes创建或部署的最小/最简单的基本单位。

2.如图所示,Pod的基础架构是由一个根容器Pause Container和多个业务Container组成的。

3.根容器的IP就是Pod IP,是由kubernetesetcd中取出相应的网段分配的, Container IP是由docker分配的,同样这些IP相对应的IP网段是被存放在etcd里。

4.业务Container暴露出来端口并且映射到相应的根容器Pause Container端口,映射出来的端口叫做endpoint

5.业务Container的生命周期就是POD的生命周期,任何一个与之相关联的Container死亡,POD也应该随之消失

Service

1.Service 是定义一系列Pod以及访问这些Pod的策略的一层抽象。Service通过Label找到Pod组。因为Service是抽象的,所以在图表里通常看不到它们的存在,这也就让这一概念更难以理解。

2.Kubernetes也会分给Service一个内部的Cluster IPService通过Label查询到相应的Pod组, 如果你的Pod是对外服务的那么还应该有一组endpoint,需要将endpoint绑到Service上,这样一个微服务就形成了。

Kubernetes CNI

CNI(Container Network Interface)是用于配置Linux容器的网络接口的规范和库组成,同时还包含了一些插件。CNI仅关心容器创建时的网络分配,和当容器被删除时释放网络资源。

Ingress

1.俗称边缘节点,假如你的Service是对外服务的,那么需要将Cluster IP暴露为对外服务,这时候就需要将IngressServiceCluster IP与端口绑定起来对外服务。这样看来其实Ingress就是将外部流量引入到Kubernetes内部,这也是这篇文章重要要将的。

2.实现Ingress的开源组件有TraefikNginx-Ingress, 前者方便部署,后者部署复杂但是性能和灵活性更好。

组件

Kube-Proxy

1.Kube-Proxy是被内置在Kubernetes的插件。
2.当ServicePod Endpoint变化时,Kube-Proxy将会改变宿主机iptables, 然后配合Flannel或者Calico将流量引入Service.

Etcd

1.Etcd是一个简单的Key-Value存储工具。
2.Etcd实现了Raft协议,这个协议主要解决分布式强一致性的问题,与之相似的有Paxos, RaftPaxos要容易实现。
3.Etcd用来存储Kubernetes的一些网络配置和其他的需要强一致性的配置,以供其他组件使用。
4.如果你想要深入了解Raft, 不放先看看raft相关资料

Flannel

1.FlannelCoreOS团队针对Kubernetes设计的一个覆盖网络Overlay Network工具,其目的在于帮助每一个使用KuberentesCoreOS主机拥有一个完整的子网。
2.主要解决PODService,跨节点相互通讯的。

Traefik

1.Traefik是一个使得部署微服务更容易的现代HTTP反向代理、负载。
2.Traefik不仅仅是对Kubernetes服务的,除了Kubernetes他还有很多的Providers,如Zookeeper,Docker Swarm, Etcd等等

Traefik工作原理

授人以鱼不如授人以渔,我想通过我看源码的思路来抛砖引玉,给大家一个启发。

思考

在我要深度了解一个组件的时候通常会做下面几件事情

  • 组件扮演的角色

  • 手动编译一个版本

  • 根据语言特性来了解组件初始化流程

  • 看单元测试,了解函数具体干什么的

  • 手动触发一个流程,在关键步骤处记录日志,单步调试

Traefik初始化流程

1.在github.com/containous/traefik/cmd/traefik下由一个名为traefik.go的文件是该组件的入口。main()方法里有这样一段代码

// 加载 Traefik全局配置
traefikConfiguration := cmd.NewTraefikConfiguration()
// 加载providers的配置
traefikPointersConfiguration := cmd.NewTraefikDefaultPointersConfiguration()

...

// 加载store的配置
storeConfigCmd :=storeconfig.NewCmd(traefikConfiguration, traefikPointersConfiguration)

// 获取命令行参数
f := flaeg.New(traefikCmd, os.Args[1:])
// 解析参数
f.AddParser(reflect.TypeOf(configuration.EntryPoints{}), &configuration.EntryPoints{})
...

// 初始化Traefik
s := staert.NewStaert(traefikCmd)
// 加载配置文件
toml := staert.NewTomlSource("traefik", []string{traefikConfiguration.ConfigFile, "/etc/traefik/", "$HOME/.traefik/", "."})
...
// 启动服务
if err := s.Run(); err != nil {
    fmtlog.Printf("Error running traefik: %s\n", err)
    os.Exit(1)
}

os.Exit(0)

上面就是组件初始化流程,当我们看完初始化流程的时候应该会想到下面几个问题:

  • 当我们手动或者自动伸缩Pods时,Traefik是怎么知道的?

    假设你已经知道Kubernets是一个C/S架构,所有的组件都要通过kube-apiserver来了解其他节点或者组件的运行状态。

    当然Traefik也不例外,他是通过Kubernetes开源的Client-GoSDK来完成与kube-apiserver交互的。

    我们来找找源码:

    github.com/containous/traefik/provider/kubernetes是关于Kubernetes的源码。我们看看到底干了啥。

    // client.go
    type Client interface {
        // 检测Namespaces下的所有变动
        WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error)
        // 获取边缘节点
        GetIngresses() []*extensionsv1beta1.Ingress
        // 获取Service
        GetService(namespace, name string) (*corev1.Service, bool, error)
        // 获取秘钥
        GetSecret(namespace, name string) (*corev1.Secret, bool, error)
        // 获取Endpoint
        GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error)
        // 更新Ingress状态
        UpdateIngressStatus(namespace, name, ip, hostname string) error
    }

    显而易见,这里通过订阅kube-apiserver,来实时的知道Service的变化,从而实时更新Traefik
    我们再来看看具体实现

    // kubernetes.go
    func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
    ...
    // 初始化一个kubernets client
    k8sClient, err := p.newK8sClient(p.LabelSelector)
    if err != nil {
        return err
    }
    ....
    // routines 连接池,这里的routines实现的很优雅,有心的同学看下
    pool.Go(func(stop chan bool) {
        operation := func() error {
            for {
                stopWatch := make(chan struct{}, 1)
                defer close(stopWatch)
                // 监视和更新namespaces下的所有变动
                eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch)
                ....
                for {
                        select {
                        case <-stop:
                            return nil
                        case event := <-eventsChan:
                            // 从kubernestes 那边接收到的事件
                            log.Debugf("Received Kubernetes event kind %T", event)
                            // 加载默认template配置
                            templateObjects, err := p.loadIngresses(k8sClient)
                            ...
                            // 对比最后一次的和这次的配置有什么不同
                            if reflect.DeepEqual(p.lastConfiguration.Get(), templateObjects) {
                                // 相同的话,滤过
                                log.Debugf("Skipping Kubernetes event kind %T", event)
                            } else {
                                // 否则更新配置
                                p.lastConfiguration.Set(templateObjects)
                                configurationChan <- types.ConfigMessage{
                                    ProviderName:  "kubernetes",
                                    Configuration: p.loadConfig(*templateObjects),
                                }
                            }
                        }
                }
        }
    }

    Kubernets返回给Traefik的数据结构大致是这样的:

    {"service":{"pod_name":{"domain":"ClusterIP"}}}

    看过上述的代码分析应该就对Traefik有一个大致的了解了。

Kube-Poxy工作原理

Kube-ProxyTraefik实现原理很像,都是通过与kube-apiserver的交互来完成实时更新iptables的,这里就不细说了,以后会有一篇文章专门讲
kube-dns, kube-proxy, Service的。

组件协同与负载均衡

简单描述流程,然后思考问题,最后考虑是否需要深入了解(取决于个人兴趣)

组件协同

用户通过访问Traefik提供的L7层端口, Traefik会转发流量到Cluster IPFlannel会将用户的请求准确的转发到相应的Node节点的Service上。(ps: Flannel初始化的时候宿主机会建立一个叫flannel0【这里的数字取决于你的Node节点数】的虚拟网卡)

负载均衡

上文讲述了kube-proxy是通过iptables来配合flannel完成一次用户请求的。

具体的流程我们只要看一个serviceiptables rules就知道了。

// 只截取了一小段,假设我们起了两个Pods
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
// 流量跳转至 KUBE-SVC-ILP7Z622KEQYQKOB
-A KUBE-SERVICES -d 10.111.182.127/32 -p tcp -m comment --comment "pks/car-info-srv:http cluster IP" -m tcp --dport 80 -j KUBE-SVC-ILP7Z622KEQYQKOB
// 50%的几率跳转至KUBE-SEP-GDPUTEQG2YTU7YON
-A KUBE-SVC-ILP7Z622KEQYQKOB -m comment --comment "pks/car-info-srv:http" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-GDPUTEQG2YTU7YON

// 流量转发至真正的Service Cluster IP
-A KUBE-SEP-GDPUTEQG2YTU7YON -s 10.244.1.57/32 -m comment --comment "pks/car-info-srv:http" -j KUBE-MARK-MASQ
-A KUBE-SEP-GDPUTEQG2YTU7YON -p tcp -m comment --comment "pks/car-info-srv:http" -m tcp -j DNAT --to-destination 10.244.1.57:80

可以很明显的看出来,kubernetes内部的负载均衡是通过iptablesprobability特性来做到的,这里就会有一个问题,当Pod副本数量过多时,iptables的表将会变得很大,这时会有性能问题。

总结

  • Traefik 通过默认的负载均衡(wrr)直接将流量通过Flannel送进POD.
  • kube-proxy 在没有 ipvs的情况下, 会通过iptables转发做负载均衡.

结尾

通过这篇文章我们简单的了解到内部负载均衡的机制,但是任然不够深入,你也可用通过这篇文章查漏补缺,觉得有什么错误的地方欢迎及时指正,我的邮箱shinemotec@gmail.com。下一篇将会讲KubernetesHPA工作原理。