目录Dlang 并行化好难受,dlang 生态太差,没办法,学了半天才明白。
我尽量以精炼的语言解释。
采用 定义,例子(代码),解释 的步骤讲解。
(资料图)
所以你可能看到很多代码,一点解释……
我会省略一些
import,让代码短一些
parallelism并行parallel迭代TaskasyncBufmap & amap消息并发发送消息更优雅的方式超时接受数据共享Data Race共用锁同步类同步初始化原子操作parallelism并行感觉好废物,这一小部分了解即可。
这部分只需要会
parallel和map & amap其实就差不多了。
介绍比较实用的几种方法。
parallel迭代foreach (i; parallel(range, work_uint_size = 100)) {    // do something here}其中 work_unit_size表示最多同时运行的数量。
例子:
import std.stdio, std.parallelism;import core.thread;struct Producer {    void produce() {        Thread.sleep(1.seconds);        writeln("Process +1");    }};void main() {    auto prods = new Producer[](10);    foreach (prod; parallel(prods)) {        prod.produce();    }}Task创建任务:
auto theTask = task!anOperation(arguments);// orauto theTask = task(&someFunction, parameters...)运行任务:theTask.executeInNewThread()
查看是否完成:if (theTask.done) { ... }
获取结果:auto result = theTask.yeildForce()
感觉没啥用。
并行保存多个需要长时间制作的元素。还需要保证使用的长时间的……
例子:
struct Producer {    int i, total;    bool empty() const {        return total <= i;    }    int front() const {        return i;    }    void popFront() {        writefln("Producing product ID: %d", i);        Thread.sleep(1.seconds / 2);        ++i;    }};void main() {    auto prods = Producer(0, 10);    foreach (prod; taskPool.asyncBuf(prods, 3)) {        writef("Got product id: %d\n", prod);        Thread.sleep(1.seconds);        writeln("Used product...");    }}map & amap先看例子:
int increase(int x) {    Thread.sleep(500.msecs);    return x + 3;}void main() {    int[] nums;    foreach (i; 0 .. 10) {        nums ~= i;    }    // auto results = taskPool.map!increase(nums);    auto results = taskPool.amap!increase(nums);    foreach (result; results) {        writeln(result);    }}可以类比 python中的 map。
两者的区别:
map可以指定同时运行的数量,而 amap是有多少运行多少。
map会一定程度上按顺序执行,而 amap并不是顺序执行,它依靠 RandomAccessRange,也就是随机顺序执行。
我不知道怎么翻译,反正就是
Message Passing Concurrency。
核心方法:spawn(唤起)
我们可以形象的认为,spawn方法可以唤起一个新的工人(线程)来为我们工作。
并且这个工人与主线程是分开的(先看代码后面解释):
import std.stdio;import std.concurrency;import core.thread;void worker() {    foreach (i; 0 .. 5) {        Thread.sleep(500.msecs);        writeln(i, " (worker) in ", thisTid);    }}void main() {    Tid myWorkerTid = spawn(&worker);    foreach (i; 0 .. 5) {        Thread.sleep(300.msecs);        writeln(i, " (main) in ", thisTid);    }    writeln("main is done!");}最终输出:
0 (main) in Tid(7f0eb19bc0b0)0 (worker) in Tid(7f0eb19bc000)1 (main) in Tid(7f0eb19bc0b0)2 (main) in Tid(7f0eb19bc0b0)1 (worker) in Tid(7f0eb19bc000)3 (main) in Tid(7f0eb19bc0b0)2 (worker) in Tid(7f0eb19bc000)4 (main) in Tid(7f0eb19bc0b0)main is done!3 (worker) in Tid(7f0eb19bc000)4 (worker) in Tid(7f0eb19bc000)实际输出可能略有差异。
解释:
spawn(&worker)唤起了一个新的线程运行 worker函数,并返回了新的线程的 id是一个结构体 Tid。
thisTid类似于一个宏,用于获取当前所在线程的 id。
先看代码后解释:
void worker() {    int value = 0;    while (value >= 0) {        value = receiveOnly!int();        double result = cast(double)value / 7;        ownerTid.send(result);    }}void main() {    Tid myWorker = spawn(&worker);    foreach (val; 0 .. 10) {        myWorker.send(val);        double result = receiveOnly!double();        writefln("Send %s got %s", val, result);    }    myWorker.send(-1); // terminate worker process}最终输出:
Send 0 got 0Send 1 got 0.142857Send 2 got 0.285714Send 3 got 0.428571Send 4 got 0.571429Send 5 got 0.714286Send 6 got 0.857143Send 7 got 1Send 8 got 1.14286Send 9 got 1.28571解释:
ownerTid类似于一个宏,用于取得唤醒自己的线程的 Tid,从而发送消息。
Tid.send(...)可以向 Tid代表的那个线程发送一条消息。
如果同时要发送多个东西,在发送的地方是 Tid.send(a, b, c, ...)。
在接受的地方要变化为 receiveOnly!(typeof(a), typeof(b), typeof(c), ...),最终得到的是一个 tuple,可以通过下标访问。
receiveOnly!type()表示只接受类型为 type的消息。
最后 myWorker.send(-1)是根据代码逻辑结束的,并不属于通法。
如果我们需要更灵活的接受方法怎么办?
void workerFunc() {    bool isDone = false;    while (!isDone) {        void intHandler(int message) {            writeln("handling int message: ", message);            if (message == -1) {                writeln("exiting");                isDone = true;            }        }        void stringHandler(string message) {            writeln("handling string message: ", message);        }    receive(&intHandler, &stringHandler);}}我们可以指定多种 Handler以处理不同的数据类型。利用 receive注册到处理类型消息的函数中。
处理更多的类型:
struct Exit {}void worker() {    bool done = false;    while (!done) {        receive(            (int message) {                writeln("int message ", message);            },            (string message) {                writeln("string message", message);            },            (Exit message) {                writeln("Exit message");                done = true;            },            (Variant message) {                writeln("Unexpected message: ", message);            }        );    }}void main() {    Tid myWorker = spawn(&worker);    myWorker.send(10);    myWorker.send("hello");    myWorker.send(10.1);    myWorker.send(Exit());}主要是使用了匿名函数……
解释:
利用std.variant.Variant以接收任何类型的数据。但是需要保证,处理所有类型数据的方法应该放在最后面,不然会导致全部被判断成 Variant。超时接受我们可以定一个超时时间,超过这个时间就直接返回。
先看代码:
struct Exit {}void worker() {    bool done = false;    while (!done) {        bool received = receiveTimeout(600.msecs,            (Exit message) {                writeln("Exit message");                done = true;            },            (Variant message) {                writeln("Some message: ", message);            }        );        if (!received) {            writeln("no message yet...");        }    }}void main() {    Tid myWorker = spawn(&worker);    myWorker.send(10);    myWorker.send("hello");    Thread.sleep(1.seconds);    myWorker.send(10.1);    myWorker.send(Exit());}最终输出:
Some message: 10Some message: hellono message yet...Some message: 10.1Exit message解释:
receiveTimeout只比 recieve多了一个参数,用于指定超时时间。
返回一个 bool变量,如果为 false则没有接收到任何消息。
等待所有线程结束:thread_joinAll()。
一般来说放在需要放的地方……即可。
数据共享终于讲到这里了。
我们先考虑一个程序:
import std.stdio;import std.concurrency;import core.thread;int variable;void printInfo(string message) {    writefln("%s: %s (@%s)", message, variable, &variable);}void worker() {    variable = 42;    printInfo("Before the worker is terminated");}void main() {    spawn(&worker);    thread_joinAll();    printInfo("After the worker is terminated");}其输出是这样的:
Before the worker is terminated: 42 (@7F308C88C530)After the worker is terminated: 0 (@7F308C98D730)可以发现,同样的变量在不同的线程里面地址是不一样的,也就是说数据是独立的,所以要有共享。
此时我们只需要修改:
shared int variable;即可。
实际上写为
shared(int) variable;会更标准,但是好麻烦……
当然,不得不说,有了消息传递,那么数据共享就是备用的方案了。
Data Race数据竞争是一个很常见的问题。
例子:
void worker(shared int* i) {    foreach (t; 0 .. 200000) {        *i = *i + 1;    }}void main() {    shared int i = 0;    foreach (id; 0 .. 10) {        spawn(&worker, &i);    }    thread_joinAll();    writeln("after i to ", i);}期望输出 2000000,但是实际输出可能远小于此。
所以我们要考虑同步:
void worker(shared int* i) {    foreach (t; 0 .. 200000) {        synchronized {            *i = *i + 1;        }    }}解释:
synchronized会隐式地创建一个锁,保证只有一个线程会持有这个锁,并且执行这些操作。
有些时候,synchronized会使得因为等待锁的额外开销使得程序变慢。但有些时候,我们可以通过更好的方法避免等待的开销,例如使用原子操作。
synchronized创建的锁只会对于这一个代码块生效,不会影响到其他的代码块。
void increase(shared int* i) {    foreach (t; 0 .. 200000) {        synchronized {            *i = *i + 1;        }    }}void decrese(shared int* i) {    foreach (t; 0 .. 200000) {        synchronized {            *i = *i - 1;        }    }}void main() {    shared int i = 0;    foreach (id; 0 .. 10) {        if (id & 1) spawn(&increase, &i);        else spawn(&decrese, &i);    }    thread_joinAll();    writeln("after i to ", i);}期望输出 0但是实际输出……不知道。所以我们需要共用锁:
synchronized (lock_object) {    // ...}修改后的代码:
class Lock {}shared Lock lock = new Lock();void increase(shared int* i) {    foreach (t; 0 .. 200000) {        synchronized (lock) {            *i = *i + 1;        }    }}void decrese(shared int* i) {    foreach (t; 0 .. 200000) {        synchronized (lock) {            *i = *i - 1;        }    }}现在就可以得到正确的答案了。
同步类我们可以使用 synchronized修饰一个类。这相当于在每一个代码块里面嵌套一个 synchronzied:
synchronized class Cls {    void func() {        // ...    }}上面的等价于:
class Cls {    void func() {        synchronized (this) {            // ...        }    }}同步初始化我们考虑这份代码:
static this() {    writeln("executing static this()");}void worker() {}void main() {    spawn(&worker);    thread_joinAll();}最终会输出两次 executing static this()。
如果我们修改为 shared static this() { ... },那么最终只会输出一次。
需要用到
core.atomic库。
有代码:
atomic!"+="(var, x);atomic!"-="(var, x);// ... like *= /= ^= ...这些都是原子操作。
有方法:
shared(int) *value;bool is_mutated = cas(value, currentValue, newValue);如果返回 true,那么值会改变,否则没有。
原子操作一般来说快于
synchronized。同时,原子操作也可以作用于结构体上,这里不作为讲解。
更多操作可以参考标准库:
core.sync.barrier
core.sync.condition
core.sync.config
core.sync.exception
core.sync.mutex
core.sync.rwmutex
core.sync.semaphore
标签:
上一篇: 安徽超链网络科技有限公司_关于安徽超链网络科技有限公司介绍 全球热推荐
下一篇: 最后一页